RocketMQ专题2:三种常用生产消费方式(顺序、广播、定时)以及顺序消费源码探究

 

顺序、广播、定时任务

前插

​ 在进行常用的三种消息类型例子展示的时候,我们先来说一说RocketMQ的几个重要概念:

  • PullConsumer与PushConsumer:主要区别在于Pull与Push的区别。对于PullConsumer,消费者会主动从broker中拉取消息进行消费。而对于PushConsumer,会封装包含消息获取、消息处理以及其他相关操作的接口给程序调用
  • Tag: Tag可以看做是一个子主题(sub-topic),可以进一步细化主题下的相关子业务。提高程序的灵活性和可扩展性
  • Broker:RocketMQ的核心组件之一。用来从生产者处接收消息,存储消息以及将消息推送给消费者。同时RocketMQ的broker也用来存储消息相关的数据,比如消费者组、消费处理的偏移量、主题以及消息队列等
  • Name Server: 可以看做是一个信息路由器。生产者和消费者从NameServer中查找对应的主题以及相应的broker

实例

​ 这里我们不玩虚的,直接将三个类型的生产者,消费者代码实例给出(在官网给出的例子上做了些许改动和注释说明):

生产者代码

/**  * 多种类型组合消息测试  * @author ziyuqi  *  */ public class MultiTypeProducer {     public static void main(String[] args) throws Exception {         // 顺序消息生产者  FIFO         OrderedProducer orderedProducer = new OrderedProducer();         orderedProducer.produce();                  // 广播消息生产者         /*BroadcastProducer broadcastProducer = new BroadcastProducer();         broadcastProducer.produce();*/                  // 定时任务消息生产者         /*ScheduledProducer scheduledProducer = new ScheduledProducer();         scheduledProducer.produce();*/     } }  /**  * 按顺序发送消息的生产者   * @author ziyuqi  *  */ class OrderedProducer {     public void produce() throws Exception {         DefaultMQProducer producer = new DefaultMQProducer("GroupD");         producer.setNamesrvAddr("localhost:9876");         producer.start();         String[] tags = new String[] {"tagA", "tagB", "tagC", "tagD", "tagE"};         for (int i=0; i<50; i++) {             Message message = new Message("OrderedTopic", tags[i % tags.length], "KEY" + i, ("Ordered Msg:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));             SendResult sendResult = producer.send(message, new MessageQueueSelector() {                 /**                  * 所谓的顺序,只能保证同一MessageQueue放入的消息满足FIFO。该方法返回应该将消息放入那个MessageQueue,最后一个参数为send传入的最后一个参数                  * 如果需要全局保持FIFO,则所有消息应该依次放入同一队列中去mqs队列中的同一下标                  */                 @Override                 public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {                     // 消息被分开放入多个队列,每个队列中的消息保证按顺序被消费FIFO                     /*int index = (Integer) arg % mqs.size();                     System.out.println("QueueSize:" + mqs.size());                     return mqs.get(index);*/                                          // 消息全部放入同一队列,全局保持顺序性                      return mqs.get(0);                 }             }, i);             System.out.println(sendResult);         }         producer.shutdown();     } }  /**  * 广播生产者  * @author ziyuqi  *  */ class BroadcastProducer {     public void produce() throws Exception {         DefaultMQProducer producer = new DefaultMQProducer("GroupA");         // 也必须设置nameServer         producer.setNamesrvAddr("localhost:9876");         producer.start();         for (int i=0; i<50; i++) {             Message message = new Message("BroadcastTopic", "tagA", "OrderID188", ("Ordered Msg:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));             SendResult sendResult = producer.send(message);             System.out.println(sendResult);         }         producer.shutdown();     } }  /**  * 定时消息发送者  * @author ziyuqi  *  */ class ScheduledProducer {     public void produce() throws Exception {         DefaultMQProducer produce
                    
50000+
5万行代码练就真实本领
17年
创办于2008年老牌培训机构
1000+
合作企业
98%
就业率

联系我们

电话咨询

0532-85025005

扫码添加微信