import re import yaml import aiohttp import asyncio import datetime import sys import traceback from aiohttp import web, ClientTimeout, TCPConnector from urllib.parse import parse_qs from cachetools import TTLCache # 创建一个TTL缓存,最多存储1000个项目,每个项目的有效期为1小时 cache = TTLCache(maxsize=1000, ttl=3600) async def fetch_url(url, session, max_retries=3, timeout=180): for attempt in range(max_retries): try: async with session.get(url, timeout=ClientTimeout(total=timeout)) as response: response.raise_for_status() return await response.text() except asyncio.TimeoutError: print(f"Attempt {attempt + 1} timed out after {timeout} seconds", flush=True) except aiohttp.ClientError as e: print(f"Attempt {attempt + 1} failed: {str(e)}", flush=True) if attempt < max_retries - 1: await asyncio.sleep(5) # 在重试之前等待5秒 raise Exception(f"Failed to fetch URL after {max_retries} attempts") async def extract_and_transform_proxies(input_text): try: # 尝试直接解析整个输入作为YAML data = yaml.safe_load(input_text) if isinstance(data, dict) and 'proxies' in data: proxies_list = data['proxies'] elif isinstance(data, list): proxies_list = data else: # 如果不是预期的格式,尝试提取proxies部分 proxies_match = re.search(r'proxies:\s*\n((?:[-\s]*{.*\n?)*)', input_text, re.MULTILINE | re.DOTALL) if proxies_match: proxies_text = proxies_match.group(1) proxies_list = yaml.safe_load(proxies_text) else: return "未找到有效的代理配置" except yaml.YAMLError as e: return f"YAML解析错误: {str(e)}" if not proxies_list: return "未找到有效的代理配置" transformed_proxies = [] for proxy in proxies_list: if proxy.get('type') == 'ss': name = proxy.get('name', '').strip() server = proxy.get('server', '').strip() port = str(proxy.get('port', '')).strip() ss_parts = [f"{name} = ss, {server}, {port}"] if 'cipher' in proxy: ss_parts.append(f"encrypt-method={proxy['cipher'].strip()}") if 'password' in proxy: ss_parts.append(f"password={proxy['password'].strip()}") if 'udp' in proxy: ss_parts.append(f"udp-relay={'true' if proxy['udp'] in [True, 'true', 'True'] else 'false'}") transformed = ", ".join(ss_parts) transformed_proxies.append(transformed) elif proxy.get('type') == 'trojan': name = proxy.get('name', '').strip() server = proxy.get('server', '').strip() port = str(proxy.get('port', '')).strip() trojan_parts = [f"{name} = trojan, {server}, {port}"] if 'password' in proxy: trojan_parts.append(f"password={proxy['password'].strip()}") if 'sni' in proxy: trojan_parts.append(f"sni={proxy['sni'].strip()}") if 'skip-cert-verify' in proxy: trojan_parts.append(f"skip-cert-verify={str(proxy['skip-cert-verify']).lower()}") if 'udp' in proxy: trojan_parts.append(f"udp={'true' if proxy['udp'] in [True, 'true', 'True'] else 'false'}") transformed = ", ".join(trojan_parts) transformed_proxies.append(transformed) return "\n".join(transformed_proxies) if transformed_proxies else "未找到有效的SS或Trojan代理配置" async def log_request(request, response): timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') client_ip = request.remote request_line = f"{request.method} {request.path}" if request.query_string: request_line += f"?{request.query_string}" status_code = response.status content_length = response.content_length log_message = f"{timestamp} - {client_ip} - \"{request_line}\" {status_code} {content_length}" print(log_message, flush=True) @web.middleware async def logging_middleware(request, handler): start_time = datetime.datetime.now() try: response = await handler(request) await log_request(request, response) end_time = datetime.datetime.now() print(f"Request processing time: {end_time - start_time}", flush=True) return response except Exception as e: end_time = datetime.datetime.now() print(f"Error occurred: {str(e)}", flush=True) print(f"Request processing time: {end_time - start_time}", flush=True) print("Traceback:", flush=True) traceback.print_exc() return web.Response(text=f"Internal Server Error: {str(e)}", status=500) async def handle_request(request): if request.path == '/': query_params = parse_qs(request.query_string) if 'url' in query_params: url = query_params['url'][0] # 检查缓存 if url in cache: print(f"Cache hit for URL: {url}", flush=True) return web.Response(text=cache[url], content_type='text/plain') try: print(f"Fetching URL: {url}", flush=True) async with aiohttp.ClientSession(connector=TCPConnector(ssl=False)) as session: input_text = await fetch_url(url, session, max_retries=3, timeout=180) print(f"URL content length: {len(input_text)}", flush=True) result = await extract_and_transform_proxies(input_text) print(f"Transformed result length: {len(result)}", flush=True) # 将结果存入缓存 cache[url] = result return web.Response(text=result, content_type='text/plain') except Exception as e: error_message = f"Error processing request: {str(e)}\n{traceback.format_exc()}" print(error_message, flush=True) return web.Response(text=error_message, status=500) else: usage_guide = """
使用方法:在URL参数中提供包含代理配置的网址。
示例:http://localhost:8080/?url=https://example.com/path-to-proxy-config