|
import re |
|
import yaml |
|
import aiohttp |
|
import asyncio |
|
import datetime |
|
import sys |
|
import traceback |
|
from aiohttp import web, ClientTimeout, TCPConnector |
|
from urllib.parse import parse_qs |
|
from cachetools import TTLCache |
|
|
|
|
|
cache = TTLCache(maxsize=1000, ttl=3600) |
|
|
|
async def fetch_url(url, session, max_retries=3, timeout=180): |
|
for attempt in range(max_retries): |
|
try: |
|
async with session.get(url, timeout=ClientTimeout(total=timeout)) as response: |
|
response.raise_for_status() |
|
return await response.text() |
|
except asyncio.TimeoutError: |
|
print(f"Attempt {attempt + 1} timed out after {timeout} seconds", flush=True) |
|
except aiohttp.ClientError as e: |
|
print(f"Attempt {attempt + 1} failed: {str(e)}", flush=True) |
|
|
|
if attempt < max_retries - 1: |
|
await asyncio.sleep(5) |
|
|
|
raise Exception(f"Failed to fetch URL after {max_retries} attempts") |
|
|
|
async def extract_and_transform_proxies(input_text): |
|
print("Original input data (first 1000 characters):") |
|
print(input_text[:1000]) |
|
print("------------------------") |
|
|
|
try: |
|
|
|
data = yaml.safe_load(input_text) |
|
if isinstance(data, dict) and 'proxies' in data: |
|
proxies_list = data['proxies'] |
|
else: |
|
|
|
proxies_match = re.search(r'proxies:\s*\n((?:[-\s]*{.*\n?)*)', input_text, re.MULTILINE | re.DOTALL) |
|
if proxies_match: |
|
proxies_text = proxies_match.group(1) |
|
proxies_list = yaml.safe_load(proxies_text) |
|
else: |
|
return "未找到有效的代理配置" |
|
except yaml.YAMLError as e: |
|
return f"YAML解析错误: {str(e)}" |
|
|
|
if not proxies_list: |
|
return "未找到有效的代理配置" |
|
|
|
print(f"Found {len(proxies_list)} possible proxy configurations") |
|
print("Sample of parsed proxies list:") |
|
print(proxies_list[:5]) |
|
print("------------------------") |
|
|
|
transformed_proxies = [] |
|
|
|
for proxy in proxies_list: |
|
if proxy.get('type') == 'ss': |
|
name = proxy.get('name', '').strip() |
|
server = proxy.get('server', '').strip() |
|
port = str(proxy.get('port', '')).strip() |
|
|
|
ss_parts = [f"{name} = ss, {server}, {port}"] |
|
|
|
if 'cipher' in proxy: |
|
ss_parts.append(f"encrypt-method={proxy['cipher'].strip()}") |
|
if 'password' in proxy: |
|
ss_parts.append(f"password={proxy['password'].strip()}") |
|
if 'udp' in proxy: |
|
ss_parts.append(f"udp-relay={'true' if proxy['udp'] in [True, 'true', 'True'] else 'false'}") |
|
|
|
transformed = ", ".join(ss_parts) |
|
transformed_proxies.append(transformed) |
|
|
|
elif proxy.get('type') == 'trojan': |
|
name = proxy.get('name', '').strip() |
|
server = proxy.get('server', '').strip() |
|
port = str(proxy.get('port', '')).strip() |
|
|
|
trojan_parts = [f"{name} = trojan, {server}, {port}"] |
|
|
|
if 'password' in proxy: |
|
trojan_parts.append(f"password={proxy['password'].strip()}") |
|
if 'sni' in proxy: |
|
trojan_parts.append(f"sni={proxy['sni'].strip()}") |
|
if 'skip-cert-verify' in proxy: |
|
trojan_parts.append(f"skip-cert-verify={str(proxy['skip-cert-verify']).lower()}") |
|
if 'udp' in proxy: |
|
trojan_parts.append(f"udp={'true' if proxy['udp'] in [True, 'true', 'True'] else 'false'}") |
|
|
|
transformed = ", ".join(trojan_parts) |
|
transformed_proxies.append(transformed) |
|
|
|
return "\n".join(transformed_proxies) if transformed_proxies else "未找到有效的SS或Trojan代理配置" |
|
|
|
async def log_request(request, response): |
|
timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') |
|
client_ip = request.remote |
|
request_line = f"{request.method} {request.path}" |
|
if request.query_string: |
|
request_line += f"?{request.query_string}" |
|
status_code = response.status |
|
content_length = response.content_length |
|
|
|
log_message = f"{timestamp} - {client_ip} - \"{request_line}\" {status_code} {content_length}" |
|
print(log_message, flush=True) |
|
|
|
@web.middleware |
|
async def logging_middleware(request, handler): |
|
start_time = datetime.datetime.now() |
|
try: |
|
response = await handler(request) |
|
await log_request(request, response) |
|
end_time = datetime.datetime.now() |
|
print(f"Request processing time: {end_time - start_time}", flush=True) |
|
return response |
|
except Exception as e: |
|
end_time = datetime.datetime.now() |
|
print(f"Error occurred: {str(e)}", flush=True) |
|
print(f"Request processing time: {end_time - start_time}", flush=True) |
|
print("Traceback:", flush=True) |
|
traceback.print_exc() |
|
return web.Response(text=f"Internal Server Error: {str(e)}", status=500) |
|
|
|
async def handle_request(request): |
|
if request.path == '/': |
|
query_params = parse_qs(request.query_string) |
|
if 'url' in query_params: |
|
url = query_params['url'][0] |
|
|
|
|
|
if url in cache: |
|
print(f"Cache hit for URL: {url}", flush=True) |
|
return web.Response(text=cache[url], content_type='text/plain') |
|
|
|
try: |
|
print(f"Fetching URL: {url}", flush=True) |
|
async with aiohttp.ClientSession(connector=TCPConnector(ssl=False)) as session: |
|
input_text = await fetch_url(url, session, max_retries=3, timeout=180) |
|
print(f"URL content length: {len(input_text)}", flush=True) |
|
result = await extract_and_transform_proxies(input_text) |
|
print(f"Transformed result length: {len(result)}", flush=True) |
|
print("First 1000 characters of transformed result:") |
|
print(result[:1000]) |
|
print("------------------------") |
|
|
|
|
|
cache[url] = result |
|
|
|
return web.Response(text=result, content_type='text/plain') |
|
except Exception as e: |
|
error_message = f"Error processing request: {str(e)}\n{traceback.format_exc()}" |
|
print(error_message, flush=True) |
|
return web.Response(text=error_message, status=500) |
|
else: |
|
usage_guide = """ |
|
<html> |
|
<body> |
|
<h1>代理配置转换工具</h1> |
|
<p>使用方法:在URL参数中提供包含代理配置的网址。</p> |
|
<p>示例:<code>http://localhost:8080/?url=https://example.com/path-to-proxy-config</code></p> |
|
</body> |
|
</html> |
|
""" |
|
return web.Response(text=usage_guide, content_type='text/html') |
|
else: |
|
return web.Response(text="Not Found", status=404) |
|
|
|
async def init_app(): |
|
app = web.Application(middlewares=[logging_middleware]) |
|
app.router.add_get('/', handle_request) |
|
return app |
|
|
|
if __name__ == "__main__": |
|
print(f"===== Application Startup at {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} =====") |
|
print("Server running on port 8080") |
|
web.run_app(init_app(), port=8080, print=lambda _: None) |
|
|