File size: 8,794 Bytes
5120311 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from entity import InputHotTopic, ResponseQueue
from threading import Thread
from queue import Queue
import time
import json, requests
from service_cache_Thien import get_data_sorl
import os
from datetime import datetime
from email_validator import push_msg_tele
import time
import json
import hashlib
from pydantic import BaseModel
class InputHotTopic(BaseModel):
start_time: str = "2024-09-03 23:00:00"
end_time: str = "2024-09-05 23:00:00"
query: str = "Giá nhà chung cư trên Hà Nội"
keywords: list = ["chung cư, Hà Nội", "Hoà Lạc"]
top_cluster: int = 5
prompt: str = """Trong 300 từ, hãy tổng hợp thành một đoạn văn một cách đầy đủ, chi tiết, và trung thực về các chủ đề xung quanh biến động giá nhà chung cư Hà Nội từ nội dung dưới đây.
Nếu không có thông tin gì liên quan đến giá nhà chung cư Hà Nội trong nội dung cung cấp thì trả lời "không có thông tin". Không đưa quan điểm cá nhân, không lặp lại một phần câu hỏi, loại bỏ phần mở đầu. Không có những câu từ liên kết như: "Sau đây là nội dung tóm tắt", "Nội dung tóm tắt là", "Dưới đây là " ... """
check_relevent: str = "Hãy đánh giá nội dung dưới đây có thông tin liên quan đến giá cả nhà chung cư Hà Nội hay không? Chỉ trả lời có hoặc không, không đưa thêm thông tin không liên quan"
max_posts: int = 5000
def get_hash_id(item: InputHotTopic):
str_hash = ""
if item.id_topic:
str_hash += item.id_topic
str_hash += item.start_time
return hashlib.sha224(str_hash.encode("utf-8")).hexdigest()
else:
return ""
class SessionProcess(object):
def __init__(self):
self.session = dict()
def hash_session(self, query: InputHotTopic):
hash_dict = query.dict()
hash_dict['time'] = int(time.time())
return hashlib.sha224(json.dumps(hash_dict).encode("utf-8")).hexdigest()
def insert_session(self, data_input):
print('data_input: ', data_input)
# if self.mode == "command_center":
# hash_id = hash_session(data_input)
# else:
hash_id = self.hash_session(data_input)
if hash_id not in self.session:
self.session[hash_id] = {"status": 0, "created_time": time.time(), "update_time": time.time(),
"result": {}, "data": data_input}
return hash_id
def get_info_session(self, hash_id: str):
if hash_id in self.session:
return self.session[hash_id]
return {"status": -2, "result": {}, "meta": {}}
def update_session(self, hash_id: str, result: dict, status: int):
if hash_id in self.session:
self.session[hash_id]["status"] = status
self.session[hash_id]["result"] = result
self.session[hash_id]["update_time"] = time.time()
return True
return False
def delete_session(self, hash_id: str):
if hash_id in self.session:
del self.session[hash_id]
return True
return False
SESSION = SessionProcess()
app = FastAPI(title="Hot Topic")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
NUM_OF_THREAD = 2
QQ = Queue(maxsize=0) # don't limit queue
def process_wc():
print('Run thr')
global SESSION, QQ
while True:
if not QQ.empty():
hash_id = QQ.get()
SESSION.update_session(hash_id, {}, 0)
print("update trạng thái status = 0: đang xử lý")
try:
ss_info = SESSION.get_info_session(hash_id)
status = ss_info["status"]
print("trạng thái hiện tại: ", status)
if status == 0:
data_input = SESSION.session[hash_id]["data"]
res_doc = get_data_sorl(data_input.query, data_input.keywords, data_input.start_time, data_input.end_time, max_posts = data_input.max_posts)
print('lenght res_doc: ', len(res_doc))
if not res_doc:
SESSION.update_session(hash_id, {}, -1)
else:
# start_time: str = "2024-03-03 23:00:00"
current_time = datetime.now()
time_now = current_time.strftime("%Y-%m-%d %H:%M:%S")
d = {
"id_topic": "99999",
"start_time": time_now,
"end_time": data_input.end_time,
"threshold": 0.3,
"top_sentence": -1,
"top_cluster": data_input.top_cluster,
"topn_summary": 10,
"type_cluster": "",
"lang_process": "",
"prompt": data_input.prompt,
"topic_name": data_input.check_relevent,
"responseHeader": {},
"benchmark_topics": [],
"response": {"docs": res_doc}
}
str_hash = ""
str_hash += "99999"
str_hash += time_now
hash_id_path = hashlib.sha224(str_hash.encode("utf-8")).hexdigest()
st_time = time.time()
try:
response = requests.post('http://10.9.3.241:8636/newsanalysis/topic_clustering', json=d, timeout=5)
except:
print("Timeout done")
print("push done msg")
res_clus = {}
# flag = False
# count = 0
# while not flag and count < 18000:
# if os.path.exists("/home/vietle/topic-clustering/log/result_{0}.txt".format(hash_id_path)):
# path_res = "/home/vietle/topic-clustering/log/result_{0}.txt".format(hash_id_path)
# with open(path_res, encoding="utf-8") as ff:
# res_clus = json.load(ff)
# res_clus["num_articles"] = len(res_doc)
# message = "Hello"
# push_msg_tele(data_input.bot_token , data_input.chat_id , message)
# print('done processing result')
# flag = True
# time.sleep(1)
# count +=1
# print('sleep: ', count)
print("update done msg")
SESSION.update_session(hash_id_path, res_clus, 1)
except Exception as ve_:
print(ve_)
SESSION.update_session(hash_id_path, {}, -1)
raise ve_
else:
time.sleep(2)
for _ in range(NUM_OF_THREAD):
worker = Thread(target=process_wc, args=())
worker.setDaemon(True)
worker.start()
@app.post("/api/v1/send_message")
def send_requests(item: InputHotTopic):
global SESSION
hash_id = SESSION.insert_session(item)
if SESSION.session[hash_id]["status"] == 0:
QQ.put(hash_id)
return ResponseQueue(statusCode=1, message="Push to queue done !", result={"hash_id": hash_id})
class InputSession(BaseModel):
hash_id: str = ""
class Response(BaseModel):
statusCode: int = 200
message: str = ""
result: dict = {}
@app.post("/api/mining/qna/result")
def get_result(item: InputSession):
global SESSION
res = SESSION.get_info_session(item.hash_id)
status = res["status"]
res = res["result"]
if status == -1:
msg = "ERROR"
elif status == 0:
msg = "processing ..."
elif status == 1:
msg = "done"
# SESSION.delete_session(item.hash_id)
else:
msg = "nothing"
return Response(statusCode=status, message=msg, result=res) |