目录
主要角色
首先我们必须需要搞明白
MQ (消息队列)中的三个基本角色Producer: 消息生产者,负责生产消息并发送到BrokerBroker: 消息处理中心,负责接受消息,存储消息,转发消息Consumer:消息消费者,负责消费消息
整体架构如下所示

自定义协议
首先从上一篇中介绍了协议的相关信息,具体厂商的
MQ(消息队列)需要遵循某种协议或者自定义协议 , 消息的生产者和消费者需要遵循其协议(约定)才能后成功地生产消息和生产消息,所以在这里我们自定义一个协议如下.消息处理中心 : 如果接收到的信息包含"SEND"字符串,即视为生产者发送的消息,消息处理中心需要将此信息存储等待消费者消费
消息处理中心 : 如果接受到的信息为CONSUME,既视为消费者发送消费请求,需要将存储的消息队列头部的信息转发给消费者,然后将此消息从队列中移除
消息处理中心 : 如果消息处理中心存储的消息满3条仍然没有消费者进行消费,则不再接受生产者的生产请求
消息生产者:需要遵循协议将生产的消息头部增加 "SEND:" 表示生产消息
消息消费者:需要遵循协议向消息处理中心发送 "CONSUME"字符串表示消费消息
流程顺序
项目构建流程
下面将整个MQ的构建流程过一遍
- 新建一个
Broker类,内部维护一个ArrayBlockingQueue队列,提供生产消息和消费消息的方法,仅仅具备存储服务功能 - 新建一个
BrokerServer类,将Broker发布为服务到本地9999端口,监听本地9999端口的Socket链接,在接受的信息中进行我们的协议校验, 这里仅仅具备接受消息,校验协议,转发消息功能; - 新建一个
MqClient类,此类提供与本地端口9999的Socket链接 ,仅仅具备生产消息和消费消息的方法 - 测试:新建两个
MyClient类对象,分别执行其生产方法和消费方法
具体使用流程
- 生产消息:客户端执行生产消息方法,传入需要生产的信息,该信息需要遵循我们自定义的协议,消息处理中心服务在接受到消息会根据自定义的协议校验该消息是否合法,如果合法如果合法就会将该消息存储到Broker内部维护的
ArrayBlockingQueue队列中.如果ArrayBlockingQueue队列没有达到我们协议中的最大长度将将消息添加到队列中,否则输出生产消息失败. - 消息消息:客户端执行消费消息方法,
Broker服务会校验请求的信息的信息是否等于CONSUME,如果验证成功则从Broker内部维护的ArrayBlockingQueue队列的Poll出一个消息返回给客户端
代码演示
消息处理中心 Broker
/** * 消息处理中心 */ public class Broker
