RabbitMQ中RPC的实现及其通信机制

 RabbitMQ中RPC的实现:客户端发送请求消息,服务端回复响应消息,为了接受响应response,客户端需要发送一个回调队列的地址来接受响应,每条消息在发送的时候会带上一个唯一的correlation_id,相应的服务端处理计算后会将结果返回到对应的correlation_id。

RPC调用流程:

当生产者启动时,它会创建一个匿名的独占回调队列,对于一个RPC请求,生产者发送一条具有两个属性的消息:reply_to(回调队列),correlation_id(每个请求的唯一值),请求被发送到rpc_queue队列,消费者等待该队列上的请求。当一个请求出现时,它会执行该任务,将带有结果的消息发送回生产者。生产者等待回调队列上的数据,当消息出现时,它检查相关ID属性,如果它与请求中的值匹配,则返回对应用程序的响应。

 RabbitMQ斐波拉契计算的RPC,消费者实现:

复制代码
""" 基于RabbitMQ实现RPC通信机制 --> 服务端 """import pika import uuid from functools import lru_cache   class RabbitServer(object):     def __init__(self):         self.conn = pika.BlockingConnection(             pika.ConnectionParameters(host='localhost', port=5672)         )         self.channel = self.conn.channel()          # 声明一个队列,并进行持久化,exclusive设置为false        self.channel.queue_declare(             exclusive=False, durable=True, queue='task_queue'         )          # 声明一个exhange交换机,类型为topic        self.channel.exchange_declare(             exchange='logs_rpc', exchange_type='topic', durable=True         )          # 将队列与交换机进行绑定        routing_keys = ['#']  # 接受所有的消息        for routing_key in routing_keys:             self.channel.queue_bind(                 exchange='logs_rpc', queue='task_queue', routing_key=routing_key             )      @lru_cache()     def fib(self, n):         """         斐波那契数列.===>程序的处理逻辑         使用lru_cache 优化递归         :param n:         :return:         """        if n == 0:             return 0         elif n == 1:             return 1        else:             return self.fib(n - 1) + self.fib(n - 2)      def call_back(self, channel, method, properties, body):         print('------------------------------------------')         print('接收到的消息为(斐波那契数列的入参项为):{}'.format(str(body)))         print('消息的相关属性为:')         print(properties)         value = self.fib(int(body))         print('斐波那契数列的运行结果为:{}'.format(str(value)))          # 交换机将消息发送到队列        self.channel.basic_publish(             exchange='',             routing_key=properties.reply_to,             body=str(value),             properties=pika.BasicProperties(                 delivery_mode=2,                 correlation_id=properties.correlation_id,             ))          # 消费者对消息进行确认        self.channel.basic_ack(delivery_tag=
                        
关键字:
50000+
5万行代码练就真实本领
17年
创办于2008年老牌培训机构
1000+
合作企业
98%
就业率

联系我们

电话咨询

0532-85025005

扫码添加微信