File size: 8,794 Bytes
5120311
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from entity import InputHotTopic, ResponseQueue
from threading import Thread
from queue import Queue
import time
import json, requests
from service_cache_Thien import get_data_sorl
import os
from datetime import datetime
from email_validator import push_msg_tele

import time
import json
import hashlib


from pydantic import BaseModel

class InputHotTopic(BaseModel):
    start_time: str = "2024-09-03 23:00:00"
    end_time: str = "2024-09-05 23:00:00"
    query: str = "Giá nhà chung cư trên Hà Nội"
    keywords: list = ["chung cư, Hà Nội", "Hoà Lạc"]
    top_cluster: int = 5
    prompt: str = """Trong 300 từ, hãy tổng hợp thành một đoạn văn một cách đầy đủ, chi tiết, và trung thực về các chủ đề xung quanh biến động giá nhà chung cư Hà Nội từ nội dung dưới đây.

Nếu không có thông tin gì liên quan đến giá nhà chung cư Hà Nội trong nội dung cung cấp thì trả lời "không có thông tin".  Không đưa quan điểm cá nhân, không lặp lại một phần câu hỏi, loại bỏ phần mở đầu. Không có những câu từ liên kết như: "Sau đây là nội dung tóm tắt", "Nội dung tóm tắt là", "Dưới đây là " ... """
    check_relevent: str = "Hãy đánh giá nội dung dưới đây có thông tin liên quan đến giá cả nhà chung cư Hà Nội hay không? Chỉ trả lời có hoặc không, không đưa thêm thông tin không liên quan"
    max_posts: int = 5000
    
def get_hash_id(item: InputHotTopic):
    str_hash = ""
    if item.id_topic:
        str_hash += item.id_topic
        str_hash += item.start_time
        return hashlib.sha224(str_hash.encode("utf-8")).hexdigest()
    else:
        return ""
    
class SessionProcess(object):

    def __init__(self):
        self.session = dict()
        
    def hash_session(self, query: InputHotTopic):
        hash_dict = query.dict()
        hash_dict['time'] = int(time.time())
        return hashlib.sha224(json.dumps(hash_dict).encode("utf-8")).hexdigest()
    
    def insert_session(self, data_input):
        print('data_input: ', data_input)
        # if self.mode == "command_center":
        #     hash_id = hash_session(data_input)
        # else:
        hash_id = self.hash_session(data_input)
        if hash_id not in self.session:
            self.session[hash_id] = {"status": 0, "created_time": time.time(), "update_time": time.time(),
                                     "result": {}, "data": data_input}
        return hash_id

    def get_info_session(self, hash_id: str):
        if hash_id in self.session:
            return self.session[hash_id]
        return {"status": -2, "result": {}, "meta": {}}

    def update_session(self, hash_id: str, result: dict, status: int):
        if hash_id in self.session:
            self.session[hash_id]["status"] = status
            self.session[hash_id]["result"] = result
            self.session[hash_id]["update_time"] = time.time()
            return True
        return False

    def delete_session(self, hash_id: str):
        if hash_id in self.session:
            del self.session[hash_id]
            return True
        return False

SESSION = SessionProcess()
app = FastAPI(title="Hot Topic")
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

NUM_OF_THREAD = 2
QQ = Queue(maxsize=0) # don't limit queue

def process_wc():
    print('Run thr')
    global SESSION, QQ
    while True:
        if not QQ.empty():
            hash_id = QQ.get()
            SESSION.update_session(hash_id, {}, 0)
            print("update trạng thái status = 0: đang xử lý")
            try:
                ss_info = SESSION.get_info_session(hash_id)
                status = ss_info["status"]
                print("trạng thái hiện tại: ", status)
                if status == 0:
                    data_input = SESSION.session[hash_id]["data"]
                    res_doc = get_data_sorl(data_input.query, data_input.keywords, data_input.start_time, data_input.end_time, max_posts = data_input.max_posts)
                    print('lenght res_doc: ', len(res_doc))
                    if not res_doc:
                        SESSION.update_session(hash_id, {}, -1)
                    else: 
                        # start_time: str = "2024-03-03 23:00:00"
                        current_time = datetime.now()
                        time_now = current_time.strftime("%Y-%m-%d %H:%M:%S")
                        d = {
                            "id_topic": "99999",
                            "start_time": time_now,
                            "end_time": data_input.end_time,
                            "threshold": 0.3,
                            "top_sentence": -1,
                            "top_cluster": data_input.top_cluster,
                            "topn_summary": 10,
                            "type_cluster": "",
                            "lang_process": "",
                            "prompt": data_input.prompt,
                            "topic_name": data_input.check_relevent,
                            "responseHeader": {},
                            "benchmark_topics": [],
                            "response": {"docs": res_doc}
                        }
                        
                        str_hash = ""
                        str_hash += "99999"
                        str_hash += time_now
                        hash_id_path = hashlib.sha224(str_hash.encode("utf-8")).hexdigest()
                        
                        st_time = time.time()
                        try:
                            response = requests.post('http://10.9.3.241:8636/newsanalysis/topic_clustering', json=d, timeout=5)
                        except:
                            print("Timeout done")
                            
                        print("push done msg")
                        res_clus = {}
                        # flag = False
                        # count = 0
                        # while not flag and count < 18000:
                        #     if os.path.exists("/home/vietle/topic-clustering/log/result_{0}.txt".format(hash_id_path)):
                        #         path_res = "/home/vietle/topic-clustering/log/result_{0}.txt".format(hash_id_path)
                        #         with open(path_res, encoding="utf-8") as ff:
                        #             res_clus = json.load(ff)
                        #             res_clus["num_articles"] = len(res_doc)
                        #             message = "Hello"
                        #             push_msg_tele(data_input.bot_token , data_input.chat_id , message)
                        #             print('done processing result')
                        #             flag = True
                        #     time.sleep(1)
                        #     count +=1
                        #     print('sleep: ', count)
                            
                            
                        print("update done msg")
                        SESSION.update_session(hash_id_path, res_clus, 1)
            except Exception as ve_:
                print(ve_)
                SESSION.update_session(hash_id_path, {}, -1)
                raise ve_
        else:
            time.sleep(2)


for _ in range(NUM_OF_THREAD):
    worker = Thread(target=process_wc, args=())
    worker.setDaemon(True)
    worker.start()
    
    
@app.post("/api/v1/send_message")
def send_requests(item: InputHotTopic):
    global SESSION
    hash_id = SESSION.insert_session(item)
    if SESSION.session[hash_id]["status"] == 0:
        QQ.put(hash_id)

    return ResponseQueue(statusCode=1, message="Push to queue done !", result={"hash_id": hash_id})

class InputSession(BaseModel):
    hash_id: str = ""
    
class Response(BaseModel):
    statusCode: int = 200
    message: str = ""
    result: dict = {}
@app.post("/api/mining/qna/result")
def get_result(item: InputSession):
    global SESSION
    res = SESSION.get_info_session(item.hash_id)
    status = res["status"]
    res = res["result"]
    if status == -1:
        msg = "ERROR"
    elif status == 0:
        msg = "processing ..."
    elif status == 1:
        msg = "done"
        # SESSION.delete_session(item.hash_id)
    else:
        msg = "nothing"
    return Response(statusCode=status, message=msg, result=res)