# -*- coding: utf-8 -*- import pika import json class ICRabbitMQ(object): def __init__(self, host, virtual_host, usr, passw, **kwargs): """ Khởi tạo :param host: địa chỉ rabbitmq server :param virtual_host: virtual_host :param queue_name: tên queue :param usr: user rabbitmq server :param passw: password """ self.host = host self.virtual_host = virtual_host self.user = usr self.passw = passw self.credentials = pika.PlainCredentials(usr, passw) self.connection = None self.kwargs = kwargs def init_connection(self): self.connection = \ pika.BlockingConnection( pika.ConnectionParameters(host=self.host, virtual_host=self.virtual_host, credentials=self.credentials)) def connection_close(self): self.connection.close() def connection_status(self): return self.connection.is_open def init_queue(self, queue_name, exchange="", exchange_type='fanout', durable=True, max_priority=-1): """ khởi tạo queue :param exchange: :param queue_name: tên queue :param durable: true (Queue vẫn tồn tại nếu nhưng RabitMQ khởi động lại) :param max_priority: Mức độ priority tối đa; None thì không xét priority; khác None thì xét priority (tối đa priority = 10) :return: channel """ if self.connection is None: self.init_connection() channel = self.connection.channel() if exchange == "" and queue_name != "": if max_priority == -1: channel.queue_declare(queue=queue_name, durable=durable) else: channel.queue_declare(queue=queue_name, durable=durable, arguments={'x-max-priority': max_priority}) else: channel.exchange_declare(exchange=exchange, exchange_type='fanout', durable=durable) return channel @staticmethod def publish_message(channel, routing_key, body, priority=-1, delivery_mode=2, exchange=''): """ run pushlish message :param channel: channel đã được tạo :param routing_key: key hoặc tên queue (nếu exchange = '') :param body: data push :param priority: mức ưu tiên :param delivery_mode: ?? :param exchange: routing """ if priority == -1: channel.basic_publish(exchange=exchange, routing_key=routing_key, body=json.dumps(body), properties=pika.BasicProperties(delivery_mode=delivery_mode)) else: channel.basic_publish(exchange=exchange, routing_key=routing_key, body=json.dumps(body), properties=pika.BasicProperties(delivery_mode=delivery_mode, priority=priority)) print("push done: ") @staticmethod def run_consummer(channel, queue_name, callback_func, is_ack=False): """ run consumer :param channel: channel đã được tạo :param queue_name: tên queue :param callback_func: hàm callback được định nghĩa bởi người dùng :return: """ print(" *wait message") def callback(ch, method, properties, body): body = json.loads(body.decode("utf-8")) if is_ack: ch.basic_ack(delivery_tag=method.delivery_tag) callback_func(body, properties) else: callback_func(body, properties) ch.basic_ack(delivery_tag=method.delivery_tag) print("receive done: ") channel.basic_qos(prefetch_count=10) channel.basic_consume(queue=queue_name, on_message_callback=callback) channel.start_consuming() if __name__ == '__main__': pass # host, virtual_host, usr, passw = '10.9.2.220', 'camera-detect', 'tuan.dao', 'lEKUWKXwFt' # rab = ICRabbitMQ(host, virtual_host, usr, passw) # queue_name = 'test_rb' # # ## test run producer # channel = rab.init_queue(queue_name) # body_data = {"2": "3423432423"} # ICRabbitMQ.publish_message(channel, queue_name, body_data) # # # ## test run consumer # def callback_func(body): # """ # function callback người dùng định nghĩa # :param body: message từ queue # :return: # """ # print(body) # # # ICRabbitMQ.run_consummer(channel, queue_name, callback_func)