Spaces:
Sleeping
Sleeping
from flask import Flask, request, render_template_string, send_from_directory, jsonify | |
from flask import render_template | |
import sqlite3 | |
import os | |
import uuid | |
import json | |
import base64 | |
import unittest | |
import requests | |
own_url = os.getenv('own_url') | |
key_d = os.getenv('gc_api') | |
gc_url = os.getenv('gc_url') | |
import json | |
from datetime import datetime | |
import whatsapp_api_webhook_server_python.webhooksHandler as handler | |
app = Flask(__name__, template_folder="./") | |
app.config['DEBUG'] = True | |
UPLOAD_FOLDER = 'static' | |
# Создание директории, если она не существует | |
if not os.path.exists(UPLOAD_FOLDER): | |
os.makedirs(UPLOAD_FOLDER) | |
# Создание базы данных и таблицы | |
DATABASES = ['data1.db', 'data2.db', 'data3.db', 'data4.db', 'data5.db'] | |
def init_db(db_name): | |
try: | |
conn = sqlite3.connect(db_name) | |
cursor = conn.cursor() | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS contacts ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
name TEXT NOT NULL, | |
phone TEXT NOT NULL, | |
email TEXT NOT NULL | |
) | |
''') | |
conn.commit() | |
conn.close() | |
except Exception as e: | |
print(f"Error initializing database {db_name}: {e}") | |
for db in DATABASES: | |
init_db(db) | |
def settings(): | |
return render_template('settings.html') | |
def onli(): | |
return render_template('online.html') | |
def veref(): | |
return render_template('ver.html') | |
def se_mes(): | |
return render_template('se_mes.html') | |
def se_mes_im(): | |
return render_template('se_mes_im.html') | |
def se_mes_ran(): | |
return render_template('se_mes_ran.html') | |
def se_mes_im_ran(): | |
return render_template('se_mes_im_ran.html') | |
def se_mes_im2(): | |
return render_template('se_mes_im2.html') | |
def se_mes_f(): | |
return render_template('se_mes_f.html') | |
def up_gr(): | |
return render_template('up_gr.html') | |
def up_user_gp(): | |
return render_template('up_user_gp.html') | |
def del_user_gp(): | |
return render_template('del_user_gp.html') | |
def up_ad(): | |
return render_template('up_ad.html') | |
def del_ad(): | |
return render_template('del_ad.html') | |
def se_opr(): | |
return render_template('se_opr.html') | |
def online(): | |
return render_template('online.html') | |
def upload_file(): | |
if 'file' not in request.files: | |
return "No file part", 400 | |
file = request.files['file'] | |
if file.filename == '': | |
return "No selected file", 400 | |
# Генерация уникального имени файла | |
unique_filename = str(uuid.uuid4()) + os.path.splitext(file.filename)[1] | |
save_path = os.path.join(UPLOAD_FOLDER, unique_filename) | |
file.save(save_path) | |
# Возвращаем полный URL загруженного файла с протоколом https | |
full_url = request.url_root.replace('http://', 'https://') + 'uploads/' + unique_filename | |
return f"File uploaded successfully and saved to {full_url}", 200 | |
def uploaded_file(filename): | |
return send_from_directory(UPLOAD_FOLDER, filename) | |
def up_fa(): | |
return render_template('up_fa.html') | |
# Маршрут для обработки GET-запроса из gc | |
def add_contact(): | |
try: | |
name = request.args.get('name') | |
phone = request.args.get('phone') | |
email = request.args.get('email') | |
if not name or not phone or not email: | |
return "Parameters 'name', 'phone', and 'email' are required.", 400 | |
conn = sqlite3.connect('data1.db') | |
cursor = conn.cursor() | |
cursor.execute('INSERT INTO contacts (name, phone, email) VALUES (?, ?, ?)', (name, phone, email)) | |
conn.commit() | |
conn.close() | |
return f"Contact added: {name} - {phone} - {email}", 200 | |
except Exception as e: | |
print(f"Error adding contact: {e}") | |
return "Internal Server Error", 500 | |
def show_contacts(): | |
try: | |
conn = sqlite3.connect('data1.db') | |
cursor = conn.cursor() | |
cursor.execute('SELECT name, phone, email FROM contacts') | |
contacts = cursor.fetchall() | |
conn.close() | |
html = ''' | |
<!doctype html> | |
<html lang="en"> | |
<head> | |
<meta charset="utf-8"> | |
<meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no"> | |
<title>Contacts</title> | |
<style> | |
table { | |
width: 70%; | |
border-collapse: collapse; | |
} | |
th, td { | |
border: 1px solid black; | |
padding: 8px; | |
text-align: left; | |
} | |
th { | |
background-color: #f2f2f2; | |
} | |
</style> | |
</head> | |
<body> | |
<h1>Contacts</h1> | |
<table> | |
<tr> | |
<th>Name</th> | |
<th>Phone</th> | |
<th>Email</th> | |
</tr> | |
{% for contact in contacts %} | |
<tr> | |
<td>{{ contact[0] }}</td> | |
<td>{{ contact[1] }}</td> | |
<td>{{ contact[2] }}</td> | |
</tr> | |
{% endfor %} | |
</table> | |
</body> | |
</html> | |
''' | |
return render_template_string(html, contacts=contacts) | |
except Exception as e: | |
print(f"Error showing contacts: {e}") | |
return "Internal Server Error", 500 | |
# Переменные с данными | |
action_d = "add" | |
params_d = "" | |
name_d = "" | |
email_d = "" | |
phone_d = "" | |
pr1_d = "" | |
pr2_d = "" | |
pr3_d = "" | |
def gc_db(): | |
# Чтение параметров из GET-запроса | |
name_d = request.args.get('name', '') | |
email_d = request.args.get('email', '') | |
phone_d = request.args.get('phone', '') | |
pr1_d = request.args.get('pr1', '') | |
pr2_d = request.args.get('pr2', '') | |
pr3_d = request.args.get('pr3', '') | |
# Формирование JSON | |
json_data = { | |
"user": { | |
"email": email_d, | |
"phone": phone_d, | |
"first_name": name_d, | |
"addfields": { | |
"pr1": pr1_d, | |
"pr2": pr2_d, | |
"pr3": pr3_d | |
} | |
}, | |
"system": { | |
"refresh_if_exists": 1 | |
}, | |
"session": { | |
"utm_source": "", | |
"utm_medium": "", | |
"utm_content": "", | |
"utm_campaign": "", | |
"utm_group": "", | |
"gcpc": "", | |
"gcao": "", | |
"referer": "" | |
} | |
} | |
# Конвертация JSON в Base64 | |
json_str = json.dumps(json_data) | |
params_d = base64.b64encode(json_str.encode('utf-8')).decode('utf-8') | |
# Данные для отправки в теле запроса | |
data = { | |
'key': key_d, | |
'action': action_d, | |
'params': params_d | |
} | |
# Отправка POST-запроса с данными в формате "form-data" | |
response = requests.post(gc_url, data=data) | |
# Возвращаем ответ от тестового адреса | |
return { | |
'status_code': response.status_code, | |
'response_body': response.text | |
} | |
if __name__ == '__main__': | |
app.run(host='0.0.0.0', port=int(os.environ.get('PORT', 7860))) | |