RabbitMQ是一个开源的消息中间件,自带管理界面友好、开发语言支持广泛、没有对其它中间件的依赖,而且社区非常活跃,特别适合中小型企业拿来就用。这篇文章主要探讨提升RabbitMQ消费速度的一些方法和实践,比如增加消费者、提高Prefetch count、多线程处理、批量Ack等。

增加消费者

这个道理比较容易理解,多个人搬砖的速度肯定比一个人要快很多。

不过实际情况中还需要面对一些技术挑战,比如后端处理能力、并发冲突,以及处理顺序。

后端处理能力:比如多个消费者都要操作数据库,那么数据库连接的并发数和读写吞吐量就是后端处理能力,如果达到了数据库的最大处理能力,增加再多的消费者也没有用,甚至会因为数据库拥塞导致整体消费速度的下降。这个问题还存在另一种情况,就是消费者是否真正的发挥了后端服务的处理能力,比如使用Redis时是否采用了多线程、IO复用等方式来进一步提升吞吐量。

并发冲突:比如两个消费者都要去修改用户的积分,单个消费者的做法可能就是取出来、改下字段的值、最后再update到数据库,多个消费者时如果同时取出了相同的数据,还这样处理的话就会出问题了。这时候可能需要修改下SQL语句,直接在SQL语句中修改积分,由数据库写入事务来处理并发冲突;或者搞一个分布式锁,对于具体的某个用户同时只能有一个消费者来处理其积分。

处理顺序:如果消息需要被顺序处理,那么各个消费者之间还需要增加一个同步机制。比如基于GPS定位的电子围栏,在出围栏的某个时段,先产生了围栏内定位消息、然后产生了围栏外定位消息;如果围栏外定位消息先被一个消费者处理,则判定为出围栏,这没有问题;然后围栏内定位消息被另一个消费者处理,则会被判定为入围栏,这个就属于误判了。这时候可能要同步一个已处理定位时间,早于这个时间的定位就抛弃掉;或者同一个设备的定位消息通过某种算法控制只能由某个消费者进行处理。

解决后边两个问题的方法不可避免的要引入多个消费者之间的协商机制,如果这些协商机制设计不好会对处理速度带来很大影响。因此多人搬砖速度快的前提是多个人搬砖时不需要大家频繁的坐下来协商谁搬哪块砖,否则就会浪费很多时间在相互协调上,反而不能提升搬砖的速度。

所以通过增加消费者提升消费速度得以成立的前提是消费者业务并发处理能力要足够,消费者依赖的后端服务处理能力也要足够。这是此种方式的关键点。

提高Prefetch count

消息消费速度主要受到发送消息时间、消费者处理时间、消息Ack时间这几个时间的影响,如果一个消息走完这个流程再发送另一个的话,效率将会非常低。可以让消息在这几个时间内恰当的分配,让消息总是连续不断的被消费者接收处理,就可以提升消费者的消费速度。

根据如上描述,有些消息可能正在被消费者处理,有些可能在等待消费者处理,有的消息可能还在网络传输中,而如果不限制传输的数量,消费者端可能因处理能力补足会堆积大量的消息,首先内存使用将不可控制,其次此时也无法将这些消息再分配给别的消费者。因此才有了Prefetch count,用于控制消息发送给消费者的速度;这个方案需要配合Ack使用,消费者回复消息Ack后,RabbitMQ才会继续发送同等数量的消息到消费者。提高Prefetch count到一个合适的值可以提升消息的消费速度。这个值的设定可能还要实时参考上边提到的三个时间,这有点类似TCP的流控措施。这个值的计算方法请看下文:

RabbitMQ关于吞吐量,延迟和带宽的一些理论

参考文档:https://blog.csdn.net/gbbqrglvir3dyi82/article/details/78663828

多线程处理

多线程处理和增加消费者有异曲同工之妙。多线程处理不需要建立多个到RabbitMQ的连接,它在收到队列消息后将其放入不同的线程中进行处理,这样进程中就会有多个消息同时处理,增加了消费吞吐量,从而提升了消费速度。

来看一个例子:

复制代码
consumer.Received += (o, e) =>     {         ThreadPool.QueueUserWorkItem(new WaitCallback(ProcessSingleContextMessage), e);     };
复制代码

 

在这个例子中波斯码将收到的消息放入线程池队列进行处理,注意这里需要配合上一节提到的Prefetch count,设置一个合适的值,消费者就可以同时处理多条消息了。

多线程处理也存在多消费者处理时的问题,只不过在一个进程中处理并发冲突和消息顺序的成本可能更低一些。下边的代码片段展示了一个解决消息顺序处理问题的方案:

复制代码
// 接收消息存入列表,当接收数量达到prefetchCount/2时就加入处理队列; // 1/2是考虑了消息从RabbitMQ到消费者的传输时间,不需要等所有的消息都到达了才开始处理。consumer.Received += (o, e) =>     {         lock(receiveLocker){             basicDeliverEventArgsList.Add(e);             if (basicDeliverEventArgsList.Count >= prefetchCount/2)             {                     var deliverEventArgs = basicDeliverEventArgsList.ToArray();                     basicDeliverEventArgsList.Clear();                     EnProcessQueue(deliverEventArgs);             }         }     };  // 此处省略数据出队列的代码,请自行脑补....  // 然后这个方法是用来处理消息的,将消息根据数据Key分成若干组,放到多个任务中并行处理; // 相同数据Key的消息将分配到一个组中,在这个组中数据被顺序处理private void Process(BasicDeliverEventArgs[] args) {     if (args.Length <= 0)     {         return;     }      try     {         var tasks = CreateParallelProcessTasksByDataKey(args);         Task.WaitAll(tasks);     }     catch (Exception ex)     {         ToLog("处理任务发生异常", ex);     } }  // 创建并行处理多条消息的任务private Task[] CreateParallelProcessTasksByDataKey(BasicDeliverEventArgs[] args) {     // 根据dataKey进行分组,dataKey可以放到消息的header中进行传输,这里就不给出具体的分组方法了    Dictionary<string, List<DeliverObject>> eDic = GetMessgeGroupByDataKey(args);