import re import yaml import aiohttp import asyncio import datetime import sys import traceback from aiohttp import web, ClientTimeout, TCPConnector from urllib.parse import parse_qs from collections import namedtuple CacheEntry = namedtuple('CacheEntry', ['data', 'timestamp']) class CustomCache: def __init__(self, ttl=1800): self.cache = {} self.ttl = ttl def get(self, key): if key in self.cache: entry = self.cache[key] if (datetime.datetime.now() - entry.timestamp).total_seconds() < self.ttl: return entry return None def set(self, key, value): self.cache[key] = CacheEntry(value, datetime.datetime.now()) cache = CustomCache(ttl=1800) # 30 minutes cache CHROME_USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" async def fetch_url(url, session, max_retries=3): headers = {"User-Agent": CHROME_USER_AGENT} for attempt in range(max_retries): try: async with session.get(url, headers=headers, timeout=ClientTimeout(total=40)) as response: response.raise_for_status() content = await response.read() return content.decode('utf-8', errors='ignore') except aiohttp.ClientError as e: print(f"Attempt {attempt + 1} failed: {str(e)}", flush=True) if attempt == max_retries - 1: raise await asyncio.sleep(1) async def extract_and_transform_proxies(input_text): try: data = yaml.safe_load(input_text) if isinstance(data, dict) and 'proxies' in data: proxies_list = data['proxies'] elif isinstance(data, list): proxies_list = data else: proxies_match = re.search(r'proxies:\s*\n((?:[-\s]*{.*\n?)*)', input_text, re.MULTILINE) if proxies_match: proxies_text = proxies_match.group(1) proxies_list = yaml.safe_load(proxies_text) else: return "未找到有效的代理配置" except yaml.YAMLError: return "YAML解析错误" if not proxies_list: return "未找到有效的代理配置" transformed_proxies = [] for proxy in proxies_list: if proxy.get('type') in ['ss', 'trojan']: name = proxy.get('name', '').strip() server = proxy.get('server', '').strip() port = str(proxy.get('port', '')).strip() parts = [f"{name} = {proxy['type']}, {server}, {port}"] if proxy['type'] == 'ss': if 'cipher' in proxy: parts.append(f"encrypt-method={proxy['cipher'].strip()}") if 'password' in proxy: parts.append(f"password={proxy['password'].strip()}") elif proxy['type'] == 'trojan': if 'password' in proxy: parts.append(f"password={proxy['password'].strip()}") if 'sni' in proxy: parts.append(f"sni={proxy['sni'].strip()}") if 'skip-cert-verify' in proxy: parts.append(f"skip-cert-verify={str(proxy['skip-cert-verify']).lower()}") if 'udp' in proxy: parts.append(f"udp-relay={'true' if proxy['udp'] in [True, 'true', 'True'] else 'false'}") transformed_proxies.append(", ".join(parts)) return "\n".join(transformed_proxies) def get_client_ip(request): headers_to_check = [ 'X-Forwarded-For', 'X-Real-IP', 'CF-Connecting-IP', 'True-Client-IP', 'X-Client-IP', ] for header in headers_to_check: ip = request.headers.get(header) if ip: return ip.split(',')[0].strip() return request.remote async def handle_request(request): if request.path == '/': query_params = parse_qs(request.query_string) if 'url' in query_params: url = query_params['url'][0] no_cache = 'nocache' in query_params cache_entry = None if no_cache else cache.get(url) cache_hit = False new_data = False if cache_entry: result = cache_entry.data cache_hit = True cache_time = cache_entry.timestamp if not cache_hit or no_cache or (datetime.datetime.now() - cache_entry.timestamp).total_seconds() >= 1800: try: async with aiohttp.ClientSession(connector=TCPConnector(ssl=False)) as session: input_text = await fetch_url(url, session) new_result = await extract_and_transform_proxies(input_text) if new_result != "未找到有效的代理配置" and new_result != "YAML解析错误": result = new_result cache.set(url, result) new_data = True cache_time = datetime.datetime.now() elif not cache_hit: result = new_result cache_time = datetime.datetime.now() except Exception as e: if not cache_hit: print(f"Error processing request: {str(e)}", flush=True) traceback.print_exc() return web.Response(text=f"Error: {str(e)}", status=500) proxy_count = result.count('\n') + 1 if result and result != "未找到有效的代理配置" else 0 return web.Response(text=result, content_type='text/plain', headers={ 'X-Proxy-Count': str(proxy_count), 'X-Cache-Hit': str(cache_hit), 'X-Cache-Time': cache_time.strftime('%Y-%m-%d %H:%M:%S'), 'X-New-Data': str(new_data), 'X-No-Cache': str(no_cache) }) else: usage_guide = """

代理配置转换工具

使用方法:在URL参数中提供包含代理配置的网址。

示例:http://localhost:8080/?url=https://example.com/path-to-proxy-config

强制获取新数据:http://localhost:8080/?url=https://example.com/path-to-proxy-config&nocache

""" return web.Response(text=usage_guide, content_type='text/html') else: return web.Response(text="Not Found", status=404) @web.middleware async def logging_middleware(request, handler): start_time = datetime.datetime.now() try: response = await handler(request) end_time = datetime.datetime.now() timestamp = end_time.strftime('%Y-%m-%d %H:%M:%S') client_ip = get_client_ip(request) target_url = request.query.get('url', '-') status_code = response.status proxy_count = response.headers.get('X-Proxy-Count', '0') cache_hit = "Hit" if response.headers.get('X-Cache-Hit') == 'True' else "Miss" cache_time = response.headers.get('X-Cache-Time', '-') new_data = "Yes" if response.headers.get('X-New-Data') == 'True' else "No" no_cache = "Yes" if response.headers.get('X-No-Cache') == 'True' else "No" log_message = f"{timestamp} - {client_ip} - \"GET /?url={target_url}\" - Status: {status_code} - Proxies: {proxy_count} - Cache: {cache_hit} - CacheTime: {cache_time} - NewData: {new_data} - NoCache: {no_cache}" print(log_message, flush=True) return response except Exception as e: end_time = datetime.datetime.now() print(f"Error occurred: {str(e)}", flush=True) print(f"Request processing time: {end_time - start_time}", flush=True) print("Traceback:", flush=True) traceback.print_exc() return web.Response(text=f"Internal Server Error: {str(e)}", status=500) async def init_app(): app = web.Application(middlewares=[logging_middleware]) app.router.add_get('/', handle_request) return app if __name__ == "__main__": print(f"===== Application Startup at {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} =====") print("Server running on port 8080") web.run_app(init_app(), port=8080, print=lambda _: None)