先决条件
本教程假定 RabbitMQ 已经安装,并运行在
localhost标准端口(5672)。如果你使用不同的主机、端口或证书,则需要调整连接设置。从哪里获得帮助
如果您在阅读本教程时遇到困难,可以通过邮件列表 联系我们。
在第 教程[2] 中,我们学习了如何使用工作队列在多个工作单元之间分配耗时任务。
但是如果我们想要运行一个在远程计算机上的函数并等待其结果呢?这将是另外一回事了。这种模式通常被称为 远程过程调用 或 RPC 。
在本篇教程中,我们将使用 RabbitMQ 构建一个 RPC 系统:一个客户端和一个可扩展的 RPC 服务器。由于我们没有什么耗时任务值得分发,那干脆就创建一个返回斐波那契数列的虚拟 RPC 服务吧。
客户端接口
为了说明如何使用 RPC 服务,我们将创建一个简单的客户端类。该类将暴露一个名为Call的方法,用来发送 RPC 请求并且保持阻塞状态,直到接收到应答为止。
var rpcClient = new RPCClient();  Console.WriteLine(" [x] Requesting fib(30)"); var response = rpcClient.Call("30"); Console.WriteLine(" [.] Got '{0}'", response);  rpcClient.Close();关于 RPC 的说明
尽管 RPC 在计算机中是一种很常见的模式,但它经常受到批评。问题出现在当程序员不知道一个函数是本地调用还是一个耗时的 RPC 请求。这样的混淆,会导致系统不可预测,以及给调试增加不必要的复杂性。误用 RPC 可能会导致不可维护的混乱代码,而不是简化软件。
牢记这些限制,请考虑如下建议:
- 确保可以明显区分哪些函数是本地调用,哪些是远程调用。
- 为您的系统编写文档,明确组件之间的依赖关系。
- 捕获异常,当 RPC 服务长时间宕机时客户端该如何应对。
当有疑问的时候可以先避免使用 RPC。如果可以的话,考虑使用异步管道 - 而不是类似 RPC 的阻塞,其会将结果以异步的方式推送到下一个计算阶段。
回调队列
一般来讲,基于 RabbitMQ 进行 RPC 通信是非常简单的,客户端发送一个请求消息,然后服务端用一个响应消息作为应答。为了能接收到响应,我们需要在发送请求过程中指定一个'callback'队列地址。
var props = channel.CreateBasicProperties(); props.ReplyTo = replyQueueName;  var messageBytes = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "",                      routingKey: "rpc_queue",                      basicProperties: props,                      body: messageBytes);  // ... then code to read a response message from the callback_queue ...消息属性
AMQP 0-9-1 协议在消息中预定义了一个包含 14 个属性的集合,大多数属性很少使用,但以下情况除外:
Persistent:将消息标记为持久的(值为2)或者瞬时的(其他值),可以参考 教程[2]。DeliveryMode:熟悉 AMQP 协议的人可以选择此属性而不是熟悉协议的人可以选择使用此属性而不是Persistent,它们控制的东西是一样的。ContentType:用于描述编码的 mime 类型。例如,对于经常使用的 JSON 编码,将此属性设置为:application/json是一种很好的做法。ReplyTo:通常用于命名回调队列。CorrelationId:用于将 RPC 响应与请求相关联。
关联ID
在上面介绍的方法中,我们建议为每个 RPC 请求创建一个回调队列,但是这种方式效率低。幸运的是我们有一种更好的方式,那就是为每个客户端创建一个独立的回调队列。
这种方式会引出一个新的问题,在收到响应的回调队列中,它无法区分响应属于哪一个请求,此时便是CorrelationId属性的所用之处。我们将为每个请求的
                        
                        
                    
