消息队列中间件(二)使用 ActiveMQ

 

ActiveMQ 介绍

Active MQ 是由 Apache 出品的一款流行的功能强大的开源消息中间件,它速度快,支持跨语言的客户端,具有易于使用的企业集成模式和许多的高级功能,同时完全支持 JSM1.1 和 J2EE1.4 。

ActiveMQ 特点

  • 支持Java,C,C ++,C#,Ruby,Perl,Python,PHP等各种跨语言客户端和协议,如 OpenWire , Stomp , AMQP , MQTT.
  • 完全支持JMS 1.1和 J2EE 1.4,支持瞬态,持久,事务和XA消息传递。
  • 对 Spring 框架的支持以便ActiveMQ可以轻松嵌入到Spring应用程序中。
  • 通过了常见的 J2EE 服务器测试,如 TomEE,Geronimo,JBoss,GlassFish 和 WebLogic 。
  • 连接方式的多样化,ActiveMQ 提供了多种连接模式,例如 in-VM、TCP、SSL、NIO、UDP、多播、JGroups、JXTA。
  • 可以通过使用 JDBC 和 journal 实现消息的快速持久化。
  • 专为高性能群集,客户端 - 服务器,点对点通信而设计。
  • 提供与语言无关的 REST API。
  • 支持 Ajax 方式调用 ActiveMQ。
  • ActiveMQ 可以轻松地与 CXF、Axis 等 Web Service 技术整合,以提供可靠的消息传递。
  • 可用作为内存中的 JMS 提供者,非常适合 JMS 单元测试。

ActiveMQ 消息

  1. 点对点队列模式
    消息到达消息系统,被保留在消息队列中,然后由一个或者多个消费者消费队列中的消息,一个消息只能被一个消费者消费,然后就会被移除。例如订单处理系统。
  2. 发布-订阅模式
    消息发送时指定主题(或者说通道),消息被保留在指定的主题中,消费者可以订阅多个主题,并使用主题中的所有的消息,例如现实中的电视与电视频道。所有客户端包括发布者和订阅者,主题中的消息可以被所有的订阅者消费,消费者只能消费订阅之后发送到主题中的消息。

ActiveMQ 概念

  • Broker,消息代理,表示消息队列服务器实体,接受客户端连接,提供消息通信的核心服务。
  • Producer,消息生产者,业务的发起方,负责生产消息并传输给 Broker 。
  • Consumer,消息消费者,业务的处理方,负责从 Broker 获取消息并进行业务逻辑处理。
  • Topic,主题,发布订阅模式下的消息统一汇集地,不同生产者向 Topic 发送消息,由 Broker 分发到不同的订阅者,实现消息的广播。
  • Queue,队列,点对点模式下特定生产者向特定队列发送消息,消费者订阅特定队列接收消息并进行业务逻辑处理。
  • Message,消息体,根据不同通信协议定义的固定格式进行编码的数据包,来封装业务 数据,实现消息的传输。

ActiveMQ 工程实例

下面是使用 ActiveMQ 的队列模式和发布-订阅模式的 Java 代码示例。

POM 依赖

        <!-- Active-MQ -->         <dependency>             <groupId>org.apache.activemq</groupId>             <artifactId>activemq-all</artifactId>             <version>5.15.5</version>         </dependency>

队列模式消费者

import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*;  /**  * <p>  * 消息消费者,用于消费消息  *  * @Author niujinpeng  * @Date 2018/9/4 23:45  */ public class AppConsumer {      private static final String url = "tcp://127.0.0.1:61616";     private static final String queueName = "queue-test";      public static void main(String[] args) throws JMSException {         // 1.创建ConnectionFactory         ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();         // 2.创建Connection         Connection connection = connectionFactory.createConnection();         // 3.启动连接         connection.start();          // 4.创建会话,false,不使用事务,自动应答模式         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);         // 5.创建一个目标         Destination destination = session.createQueue(queueName);         // 6.创建消费者         MessageConsumer consumer = session.createConsumer(destination);          // 7.创建一个监听器         consumer.setMessageListener(new MessageListener() {             public void onMessage(Message message) {                 TextMessage textMessage = (TextMessage) message;                 try {                     System.out.println(
                        
关键字:
50000+
5万行代码练就真实本领
17年
创办于2008年老牌培训机构
1000+
合作企业
98%
就业率

联系我们

电话咨询

0532-85025005

扫码添加微信