顺序、广播、定时任务
前插
在进行常用的三种消息类型例子展示的时候,我们先来说一说RocketMQ的几个重要概念:
- PullConsumer与PushConsumer:主要区别在于Pull与Push的区别。对于PullConsumer,消费者会主动从broker中拉取消息进行消费。而对于PushConsumer,会封装包含消息获取、消息处理以及其他相关操作的接口给程序调用
- Tag: Tag可以看做是一个子主题(sub-topic),可以进一步细化主题下的相关子业务。提高程序的灵活性和可扩展性
- Broker:RocketMQ的核心组件之一。用来从生产者处接收消息,存储消息以及将消息推送给消费者。同时RocketMQ的broker也用来存储消息相关的数据,比如消费者组、消费处理的偏移量、主题以及消息队列等
- Name Server: 可以看做是一个信息路由器。生产者和消费者从NameServer中查找对应的主题以及相应的broker
实例
这里我们不玩虚的,直接将三个类型的生产者,消费者代码实例给出(在官网给出的例子上做了些许改动和注释说明):
生产者代码
/** * 多种类型组合消息测试 * @author ziyuqi * */ public class MultiTypeProducer { public static void main(String[] args) throws Exception { // 顺序消息生产者 FIFO OrderedProducer orderedProducer = new OrderedProducer(); orderedProducer.produce(); // 广播消息生产者 /*BroadcastProducer broadcastProducer = new BroadcastProducer(); broadcastProducer.produce();*/ // 定时任务消息生产者 /*ScheduledProducer scheduledProducer = new ScheduledProducer(); scheduledProducer.produce();*/ } } /** * 按顺序发送消息的生产者 * @author ziyuqi * */ class OrderedProducer { public void produce() throws Exception { DefaultMQProducer producer = new DefaultMQProducer("GroupD"); producer.setNamesrvAddr("localhost:9876"); producer.start(); String[] tags = new String[] {"tagA", "tagB", "tagC", "tagD", "tagE"}; for (int i=0; i<50; i++) { Message message = new Message("OrderedTopic", tags[i % tags.length], "KEY" + i, ("Ordered Msg:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(message, new MessageQueueSelector() { /** * 所谓的顺序,只能保证同一MessageQueue放入的消息满足FIFO。该方法返回应该将消息放入那个MessageQueue,最后一个参数为send传入的最后一个参数 * 如果需要全局保持FIFO,则所有消息应该依次放入同一队列中去mqs队列中的同一下标 */ @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { // 消息被分开放入多个队列,每个队列中的消息保证按顺序被消费FIFO /*int index = (Integer) arg % mqs.size(); System.out.println("QueueSize:" + mqs.size()); return mqs.get(index);*/ // 消息全部放入同一队列,全局保持顺序性 return mqs.get(0); } }, i); System.out.println(sendResult); } producer.shutdown(); } } /** * 广播生产者 * @author ziyuqi * */ class BroadcastProducer { public void produce() throws Exception { DefaultMQProducer producer = new DefaultMQProducer("GroupA"); // 也必须设置nameServer producer.setNamesrvAddr("localhost:9876"); producer.start(); for (int i=0; i<50; i++) { Message message = new Message("BroadcastTopic", "tagA", "OrderID188", ("Ordered Msg:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(message); System.out.println(sendResult); } producer.shutdown(); } } /** * 定时消息发送者 * @author ziyuqi * */ class ScheduledProducer { public void produce() throws Exception { DefaultMQProducer produce
