import re import json import yaml import aiohttp import asyncio import datetime import sys import traceback from aiohttp import web, ClientTimeout, TCPConnector, ClientError, ServerTimeoutError, TooManyRedirects from collections import namedtuple CacheEntry = namedtuple('CacheEntry', ['data', 'timestamp']) class CustomCache: def __init__(self, ttl=1800): self.cache = {} self.ttl = ttl def get(self, key): if key in self.cache: entry = self.cache[key] if ( - entry.timestamp).total_seconds() < self.ttl: return entry return None def set(self, key, value): self.cache[key] = CacheEntry(value, cache = CustomCache(ttl=1800) # 30 minutes cache CHROME_USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" async def fetch_url(url, session, max_retries=5): headers = {"User-Agent": CHROME_USER_AGENT} for attempt in range(max_retries): try: timeout = ClientTimeout(total=40 * (attempt + 1)) # 递增的超时时间 async with session.get(url, headers=headers, timeout=timeout) as response: response.raise_for_status() content = await return content.decode('utf-8', errors='ignore') except (ClientError, asyncio.TimeoutError, ServerTimeoutError, TooManyRedirects) as e: wait_time = 2 ** attempt # 指数退避 print(f"Attempt {attempt + 1} failed: {str(e)}. Retrying in {wait_time} seconds...", flush=True) if attempt == max_retries - 1: raise await asyncio.sleep(wait_time) async def extract_and_transform_proxies(input_text): try: # 首先尝试解析为JSON try: config = json.loads(input_text) # 检查是否是完整的配置文件格式 if 'outbounds' in config: proxies_list = [outbound for outbound in config['outbounds'] if outbound.get('type') in ['ss', 'shadowsocks', 'trojan', 'hysteria2']] else: # 如果不是完整配置文件,假设它是代理列表 proxies_list = config if isinstance(config, list) else [config] except json.JSONDecodeError: # 如果不是JSON,尝试YAML格式 try: data = yaml.safe_load(input_text) if isinstance(data, dict) and 'proxies' in data: proxies_list = data['proxies'] elif isinstance(data, list): proxies_list = data else: proxies_match ='proxies:\s*\n((?:[-\s]*{.*\n?)*)', input_text, re.MULTILINE) if proxies_match: proxies_text = proxies_list = yaml.safe_load(proxies_text) else: return "未找到有效的代理配置" except yaml.YAMLError: return "YAML解析错误" if not proxies_list: return "未找到有效的代理配置" transformed_proxies = [] for proxy in proxies_list: proxy_type = proxy.get('type') if proxy_type not in ['ss', 'shadowsocks', 'trojan', 'hysteria2']: continue name = proxy.get('tag', '') or proxy.get('name', '') name = name.strip() server = proxy.get('server', '').strip() port = str(proxy.get('server_port', '') or proxy.get('port', '')).strip() if proxy_type == 'shadowsocks': proxy_type = 'ss' # 将 shadowsocks 转换为 ss parts = [f"{name} = {proxy_type}, {server}, {port}"] if proxy_type == 'ss': if 'method' in proxy: parts.append(f"encrypt-method={proxy['method'].strip()}") elif 'cipher' in proxy: parts.append(f"encrypt-method={proxy['cipher'].strip()}") if 'password' in proxy: parts.append(f"password={proxy['password'].strip()}") elif proxy_type in ['trojan', 'hysteria2']: if 'password' in proxy: parts.append(f"password={proxy['password'].strip()}") # 处理TLS配置 tls_config = proxy.get('tls', {}) if isinstance(tls_config, dict): if tls_config.get('insecure', False): parts.append("skip-cert-verify=true") if 'server_name' in tls_config: parts.append(f"sni={tls_config['server_name'].strip()}") elif 'skip-cert-verify' in proxy: parts.append(f"skip-cert-verify={str(proxy['skip-cert-verify']).lower()}") elif proxy_type == 'hysteria2': parts.append(f"skip-cert-verify=true") if 'sni' in proxy: parts.append(f"sni={proxy['sni'].strip()}") # 处理TCP Fast Open配置 if 'tcp_fast_open' in proxy: parts.append(f"tfo={str(proxy['tcp_fast_open']).lower()}") # 统一处理 udp-relay,避免重复 if 'udp' in proxy: parts.append(f"udp-relay={'true' if proxy['udp'] in [True, 'true', 'True'] else 'false'}") elif proxy_type == 'ss': parts.append("udp-relay=true") transformed_proxies.append(", ".join(parts)) return "\n".join(transformed_proxies) if transformed_proxies else "未找到有效的SS、Trojan或Hysteria2代理配置" except Exception as e: print(f"Error in extract_and_transform_proxies: {str(e)}", flush=True) return f"配置解析错误: {str(e)}" def get_client_ip(request): headers_to_check = [ 'X-Forwarded-For', 'X-Real-IP', 'CF-Connecting-IP', 'True-Client-IP', 'X-Client-IP', ] for header in headers_to_check: ip = request.headers.get(header) if ip: return ip.split(',')[0].strip() return request.remote async def handle_request(request): if request.path == '/': if 'url' in request.query: url = request.query['url'] no_cache = 'nocache' in request.query cache_entry = None if no_cache else cache.get(url) cache_hit = False if cache_entry and not no_cache: result = cache_hit = True cache_time = cache_entry.timestamp if not cache_hit or no_cache: try: async with aiohttp.ClientSession(connector=TCPConnector(ssl=False)) as session: input_text = await fetch_url(url, session) new_result = await extract_and_transform_proxies(input_text) if new_result != "未找到有效的代理配置" and new_result != "YAML解析错误": result = new_result cache.set(url, result) cache_time = elif not cache_hit: result = new_result cache_time = except Exception as e: error_message = f"Error processing request: {str(e)}" print(error_message, flush=True) traceback.print_exc() if cache_entry: print("Using cached data due to error", flush=True) result = cache_time = cache_entry.timestamp cache_hit = True else: return web.Response(text=error_message, status=500) proxy_count = result.count('\n') + 1 if result and result != "未找到有效的代理配置" else 0 return web.Response(text=result, content_type='text/plain', headers={ 'X-Proxy-Count': str(proxy_count), 'X-Cache-Hit': str(cache_hit), 'X-Cache-Time': cache_time.strftime('%Y-%m-%d %H:%M:%S'), 'X-No-Cache': str(no_cache) }) else: usage_guide = """