|
import re |
|
import yaml |
|
import requests |
|
import datetime |
|
import sys |
|
from http.server import HTTPServer, BaseHTTPRequestHandler |
|
from urllib.parse import urlparse, parse_qs |
|
|
|
def extract_and_transform_proxies(input_text): |
|
proxies_match = re.search(r'proxies:\s*\n((?:[-\s]*{.*\n?)*)', input_text, re.MULTILINE) |
|
if not proxies_match: |
|
return "未找到proxies部分" |
|
|
|
proxies_text = proxies_match.group(1) |
|
|
|
try: |
|
proxies_list = yaml.safe_load(proxies_text) |
|
except yaml.YAMLError: |
|
proxies_list = [] |
|
for proxy_str in re.findall(r'{(.*?)}', proxies_text, re.DOTALL): |
|
proxy_dict = {} |
|
for item in proxy_str.split(','): |
|
key, value = item.split(':', 1) |
|
proxy_dict[key.strip()] = value.strip() |
|
proxies_list.append(proxy_dict) |
|
|
|
if not proxies_list: |
|
return "未找到有效的代理配置" |
|
|
|
transformed_proxies = [] |
|
|
|
for proxy in proxies_list: |
|
if proxy.get('type') == 'ss': |
|
name = proxy.get('name', '').strip() |
|
server = proxy.get('server', '').strip() |
|
port = str(proxy.get('port', '')).strip() |
|
|
|
ss_parts = [f"{name} = ss, {server}, {port}"] |
|
|
|
if 'cipher' in proxy: |
|
ss_parts.append(f"encrypt-method={proxy['cipher'].strip()}") |
|
if 'password' in proxy: |
|
ss_parts.append(f"password={proxy['password'].strip()}") |
|
if 'udp' in proxy: |
|
ss_parts.append(f"udp-relay={'true' if proxy['udp'] in [True, 'true', 'True'] else 'false'}") |
|
|
|
transformed = ", ".join(ss_parts) |
|
transformed_proxies.append(transformed) |
|
|
|
elif proxy.get('type') == 'trojan': |
|
name = proxy.get('name', '').strip() |
|
server = proxy.get('server', '').strip() |
|
port = str(proxy.get('port', '')).strip() |
|
|
|
trojan_parts = [f"{name} = trojan, {server}, {port}"] |
|
|
|
if 'password' in proxy: |
|
trojan_parts.append(f"password={proxy['password'].strip()}") |
|
if 'sni' in proxy: |
|
trojan_parts.append(f"sni={proxy['sni'].strip()}") |
|
if 'skip-cert-verify' in proxy: |
|
trojan_parts.append(f"skip-cert-verify={str(proxy['skip-cert-verify']).lower()}") |
|
|
|
transformed = ", ".join(trojan_parts) |
|
transformed_proxies.append(transformed) |
|
|
|
return "\n".join(transformed_proxies) if transformed_proxies else "未找到有效的SS或Trojan代理配置" |
|
|
|
class RequestHandler(BaseHTTPRequestHandler): |
|
def log_request(self, code='-', size='-'): |
|
if 'url=' in self.path: |
|
timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') |
|
sys.stdout.write(f"{timestamp} - {self.client_address[0]} - \"{self.requestline}\" {code} {size}\n") |
|
sys.stdout.flush() |
|
|
|
def do_GET(self): |
|
parsed_path = urlparse(self.path) |
|
query_params = parse_qs(parsed_path.query) |
|
|
|
if parsed_path.path == '/': |
|
if 'url' in query_params: |
|
url = query_params['url'][0] |
|
sys.stdout.write(f"处理URL: {url}\n") |
|
sys.stdout.flush() |
|
try: |
|
response = requests.get(url) |
|
response.raise_for_status() |
|
input_text = response.text |
|
result = extract_and_transform_proxies(input_text) |
|
self.send_response(200) |
|
self.send_header('Content-type', 'text/plain; charset=utf-8') |
|
self.end_headers() |
|
self.wfile.write(result.encode('utf-8')) |
|
except requests.RequestException as e: |
|
self.send_error(500, f"Error fetching data: {str(e)}") |
|
else: |
|
self.send_response(200) |
|
self.send_header('Content-type', 'text/html; charset=utf-8') |
|
self.end_headers() |
|
usage_guide = """ |
|
<html> |
|
<body> |
|
<h1>代理配置转换工具</h1> |
|
<p>使用方法:在URL参数中提供包含代理配置的网址。</p> |
|
<p>示例:<code>http://localhost:8080/?url=https://example.com/path-to-proxy-config</code></p> |
|
</body> |
|
</html> |
|
""" |
|
self.wfile.write(usage_guide.encode('utf-8')) |
|
else: |
|
self.send_error(404, "Not Found") |
|
|
|
def log_message(self, format, *args): |
|
|
|
pass |
|
|
|
def run_server(port=8080): |
|
server_address = ('0.0.0.0', port) |
|
httpd = HTTPServer(server_address, RequestHandler) |
|
sys.stdout.write(f"===== Application Startup at {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} =====\n") |
|
sys.stdout.write(f"Server running on port {port}\n") |
|
sys.stdout.flush() |
|
httpd.serve_forever() |
|
|
|
if __name__ == "__main__": |
|
run_server() |
|
|