# import os # os.environ['CUDA_VISIBLE_DEVICES'] = "-1" import pika import json import time import requests from merge_topic import main # from get_config import config_params from config import get_config config_params = get_config() ConfigManager = config_params['ConfigManager'] URL_SAVE_CLUSTERING_CMS = ConfigManager["ApiConnects"]["api_save_cluster_newscms"]["BaseUrl"] def update_result(result, id, meta = {}): print(result) print("-----") output = { "id": id, "result":json.dumps(result) } res = requests.post(url=URL_SAVE_CLUSTERING_CMS, json = output) print(res.text) print('Update result !!!!!!!!!') def callback_func(ch, method, properties, body): print("receive done: ") starttime = time.time() body = json.loads(body.decode("utf-8")) with open("/home/vietle/topic-clustering/input_merge1.json", 'w') as f: json.dump(body,f,ensure_ascii = False) req = body req["type"] = "monthly" id = req["id"] meta = req.get('meta', {}) preprocess_reformat = [] preprocess = req["preprocess"] for daily_clusters in preprocess: dict_cluster = {} for i,doc in enumerate(daily_clusters["topic"]): reps_post = doc lst_doc = [reps_post] lst_doc.extend(doc.get("list_posts", [])) dict_cluster[i] = lst_doc it = { "topic": dict_cluster } preprocess_reformat.append(it) req["preprocess"] = preprocess_reformat res = main(req) update_result(res, id, meta=meta) print('Time process:', time.time() - starttime) ch.basic_ack(delivery_tag=method.delivery_tag) def test(): with open("/home/vietle/topic-clustering/input_merge1.json", 'r') as f: body = json.load(f) req = body req["type"] = "monthly" id = req["id"] meta = req.get('meta', {}) preprocess_reformat = [] preprocess = req["preprocess"] for daily_clusters in preprocess: dict_cluster = {} for i,topic in enumerate(daily_clusters["topic"]): dict_cluster[i] = topic it = { "topic": dict_cluster } preprocess_reformat.append(it) req["preprocess"] = preprocess_reformat with open("/home/vietle/topic-clustering/input_merge2.json", 'w') as f: json.dump(req,f,ensure_ascii = False) res = main(req) if __name__ == '__main__': # test() params = ConfigManager['QueueConfigs']['queue_merge_clustering_newscms'] usr_name = params["UserName"] password = str(params["Password"]) host = params["HostName"] virtual_host = params["VirtualHost"] queue_name = params["Queue"] while True: try: credentials = pika.PlainCredentials(usr_name, password) connection = pika.BlockingConnection( pika.ConnectionParameters(host=host, virtual_host=virtual_host, credentials=credentials, heartbeat=3600, blocked_connection_timeout=3600)) channel = connection.channel() channel.queue_declare(queue=queue_name, durable=True, arguments={"x-max-priority": 10}) print(" * wait message") channel.basic_qos(prefetch_count=1) channel.basic_consume(queue=queue_name, on_message_callback=callback_func) channel.start_consuming() except Exception as ex: print(f'[ERROR] ', ex) raise ex