|
import re |
|
import json |
|
import yaml |
|
import aiohttp |
|
import asyncio |
|
import datetime |
|
import sys |
|
import traceback |
|
from aiohttp import web, ClientTimeout, TCPConnector, ClientError, ServerTimeoutError, TooManyRedirects |
|
from collections import namedtuple |
|
|
|
CacheEntry = namedtuple('CacheEntry', ['data', 'timestamp']) |
|
|
|
class CustomCache: |
|
def __init__(self, ttl=1800): |
|
self.cache = {} |
|
self.ttl = ttl |
|
|
|
def get(self, key): |
|
if key in self.cache: |
|
entry = self.cache[key] |
|
if (datetime.datetime.now() - entry.timestamp).total_seconds() < self.ttl: |
|
return entry |
|
return None |
|
|
|
def set(self, key, value): |
|
self.cache[key] = CacheEntry(value, datetime.datetime.now()) |
|
|
|
cache = CustomCache(ttl=1800) |
|
|
|
CHROME_USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" |
|
|
|
async def fetch_url(url, session, max_retries=5): |
|
headers = {"User-Agent": CHROME_USER_AGENT} |
|
for attempt in range(max_retries): |
|
try: |
|
timeout = ClientTimeout(total=40 * (attempt + 1)) |
|
async with session.get(url, headers=headers, timeout=timeout) as response: |
|
response.raise_for_status() |
|
content = await response.read() |
|
return content.decode('utf-8', errors='ignore') |
|
except (ClientError, asyncio.TimeoutError, ServerTimeoutError, TooManyRedirects) as e: |
|
wait_time = 2 ** attempt |
|
print(f"Attempt {attempt + 1} failed: {str(e)}. Retrying in {wait_time} seconds...", flush=True) |
|
if attempt == max_retries - 1: |
|
raise |
|
await asyncio.sleep(wait_time) |
|
|
|
async def extract_and_transform_proxies(input_text): |
|
try: |
|
|
|
try: |
|
config = json.loads(input_text) |
|
|
|
if 'outbounds' in config: |
|
proxies_list = [outbound for outbound in config['outbounds'] |
|
if outbound.get('type') in ['ss', 'shadowsocks', 'trojan', 'hysteria2']] |
|
else: |
|
|
|
proxies_list = config if isinstance(config, list) else [config] |
|
except json.JSONDecodeError: |
|
|
|
try: |
|
data = yaml.safe_load(input_text) |
|
if isinstance(data, dict) and 'proxies' in data: |
|
proxies_list = data['proxies'] |
|
elif isinstance(data, list): |
|
proxies_list = data |
|
else: |
|
proxies_match = re.search(r'proxies:\s*\n((?:[-\s]*{.*\n?)*)', input_text, re.MULTILINE) |
|
if proxies_match: |
|
proxies_text = proxies_match.group(1) |
|
proxies_list = yaml.safe_load(proxies_text) |
|
else: |
|
return "未找到有效的代理配置" |
|
except yaml.YAMLError: |
|
return "YAML解析错误" |
|
|
|
if not proxies_list: |
|
return "未找到有效的代理配置" |
|
|
|
transformed_proxies = [] |
|
|
|
for proxy in proxies_list: |
|
proxy_type = proxy.get('type') |
|
if proxy_type not in ['ss', 'shadowsocks', 'trojan', 'hysteria2']: |
|
continue |
|
|
|
name = proxy.get('tag', '') or proxy.get('name', '') |
|
name = name.strip() |
|
server = proxy.get('server', '').strip() |
|
port = str(proxy.get('server_port', '') or proxy.get('port', '')).strip() |
|
|
|
if proxy_type == 'shadowsocks': |
|
proxy_type = 'ss' |
|
|
|
parts = [f"{name} = {proxy_type}, {server}, {port}"] |
|
|
|
if proxy_type == 'ss': |
|
if 'method' in proxy: |
|
parts.append(f"encrypt-method={proxy['method'].strip()}") |
|
elif 'cipher' in proxy: |
|
parts.append(f"encrypt-method={proxy['cipher'].strip()}") |
|
if 'password' in proxy: |
|
parts.append(f"password={proxy['password'].strip()}") |
|
|
|
parts.append("udp-relay=true") |
|
elif proxy_type in ['trojan', 'hysteria2']: |
|
if 'password' in proxy: |
|
parts.append(f"password={proxy['password'].strip()}") |
|
|
|
|
|
tls_config = proxy.get('tls', {}) |
|
if isinstance(tls_config, dict): |
|
if tls_config.get('insecure', False): |
|
parts.append("skip-cert-verify=true") |
|
if 'server_name' in tls_config: |
|
parts.append(f"sni={tls_config['server_name'].strip()}") |
|
elif 'skip-cert-verify' in proxy: |
|
parts.append(f"skip-cert-verify={str(proxy['skip-cert-verify']).lower()}") |
|
elif proxy_type == 'hysteria2': |
|
parts.append(f"skip-cert-verify=true") |
|
|
|
if 'sni' in proxy: |
|
parts.append(f"sni={proxy['sni'].strip()}") |
|
|
|
|
|
if 'tcp_fast_open' in proxy: |
|
parts.append(f"tfo={str(proxy['tcp_fast_open']).lower()}") |
|
|
|
if 'udp' in proxy: |
|
parts.append(f"udp-relay={'true' if proxy['udp'] in [True, 'true', 'True'] else 'false'}") |
|
|
|
transformed_proxies.append(", ".join(parts)) |
|
|
|
return "\n".join(transformed_proxies) if transformed_proxies else "未找到有效的SS、Trojan或Hysteria2代理配置" |
|
except Exception as e: |
|
print(f"Error in extract_and_transform_proxies: {str(e)}", flush=True) |
|
return f"配置解析错误: {str(e)}" |
|
|
|
def get_client_ip(request): |
|
headers_to_check = [ |
|
'X-Forwarded-For', |
|
'X-Real-IP', |
|
'CF-Connecting-IP', |
|
'True-Client-IP', |
|
'X-Client-IP', |
|
] |
|
for header in headers_to_check: |
|
ip = request.headers.get(header) |
|
if ip: |
|
return ip.split(',')[0].strip() |
|
return request.remote |
|
|
|
async def handle_request(request): |
|
if request.path == '/': |
|
if 'url' in request.query: |
|
url = request.query['url'] |
|
no_cache = 'nocache' in request.query |
|
cache_entry = None if no_cache else cache.get(url) |
|
cache_hit = False |
|
|
|
if cache_entry and not no_cache: |
|
result = cache_entry.data |
|
cache_hit = True |
|
cache_time = cache_entry.timestamp |
|
|
|
if not cache_hit or no_cache: |
|
try: |
|
async with aiohttp.ClientSession(connector=TCPConnector(ssl=False)) as session: |
|
input_text = await fetch_url(url, session) |
|
new_result = await extract_and_transform_proxies(input_text) |
|
if new_result != "未找到有效的代理配置" and new_result != "YAML解析错误": |
|
result = new_result |
|
cache.set(url, result) |
|
cache_time = datetime.datetime.now() |
|
elif not cache_hit: |
|
result = new_result |
|
cache_time = datetime.datetime.now() |
|
except Exception as e: |
|
error_message = f"Error processing request: {str(e)}" |
|
print(error_message, flush=True) |
|
traceback.print_exc() |
|
if cache_entry: |
|
print("Using cached data due to error", flush=True) |
|
result = cache_entry.data |
|
cache_time = cache_entry.timestamp |
|
cache_hit = True |
|
else: |
|
return web.Response(text=error_message, status=500) |
|
|
|
proxy_count = result.count('\n') + 1 if result and result != "未找到有效的代理配置" else 0 |
|
return web.Response(text=result, content_type='text/plain', headers={ |
|
'X-Proxy-Count': str(proxy_count), |
|
'X-Cache-Hit': str(cache_hit), |
|
'X-Cache-Time': cache_time.strftime('%Y-%m-%d %H:%M:%S'), |
|
'X-No-Cache': str(no_cache) |
|
}) |
|
else: |
|
usage_guide = """ |
|
<html> |
|
<body> |
|
<h1>代理配置转换工具</h1> |
|
<p>使用方法:在URL参数中提供包含代理配置的网址。</p> |
|
<p>示例:<code>http://localhost:8080/?url=https://example.com/path-to-proxy-config</code></p> |
|
<p>强制获取新数据:<code>http://localhost:8080/?url=https://example.com/path-to-proxy-config&nocache</code></p> |
|
</body> |
|
</html> |
|
""" |
|
return web.Response(text=usage_guide, content_type='text/html') |
|
else: |
|
return web.Response(text="Not Found", status=404) |
|
|
|
@web.middleware |
|
async def logging_middleware(request, handler): |
|
start_time = datetime.datetime.now() |
|
try: |
|
response = await handler(request) |
|
end_time = datetime.datetime.now() |
|
|
|
timestamp = end_time.strftime('%Y-%m-%d %H:%M:%S') |
|
client_ip = get_client_ip(request) |
|
target_url = request.query.get('url', '-') |
|
no_cache = 'nocache' in request.query |
|
status_code = response.status |
|
proxy_count = response.headers.get('X-Proxy-Count', '0') |
|
cache_hit = "Hit" if response.headers.get('X-Cache-Hit') == 'True' else "Miss" |
|
cache_time = response.headers.get('X-Cache-Time', '-') |
|
|
|
log_message = f"{timestamp} - {client_ip} - \"GET /?url={target_url}{'&nocache' if no_cache else ''}\" - Status: {status_code} - Proxies: {proxy_count} - Cache: {cache_hit} - CacheTime: {cache_time} - NoCache: {'Yes' if no_cache else 'No'}" |
|
print(log_message, flush=True) |
|
|
|
return response |
|
except Exception as e: |
|
end_time = datetime.datetime.now() |
|
print(f"Error occurred: {str(e)}", flush=True) |
|
print(f"Request processing time: {end_time - start_time}", flush=True) |
|
print("Traceback:", flush=True) |
|
traceback.print_exc() |
|
return web.Response(text=f"Internal Server Error: {str(e)}", status=500) |
|
|
|
async def init_app(): |
|
app = web.Application(middlewares=[logging_middleware]) |
|
app.router.add_get('/', handle_request) |
|
return app |
|
|
|
if __name__ == "__main__": |
|
print(f"===== Application Startup at {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} =====") |
|
print("Server running on port 8080") |
|
web.run_app(init_app(), port=8080, print=lambda _: None) |
|
|