好久没有更新技术博客了,今天正好有点时间,自学了一下RabbitMQ,现在就将我所学到的东西分享给大家,让你们也能同我一起进步,在程序员的技术道路上,一直前进下去。
首先学习技术,我觉得还是能在官网去学习,这样才能学到最权威,最新的技术。毕竟是初学者,如果就一味的通过百度去学习技术,有可能会遇到不断的坑,只看别人写的博客,对于初学者来说,又不知对否,只能一味的接纳,最终会遇到不可预料的坑,都不知道如何下手,所以我建议大家能去官网学习,虽然说官网都是英文的,对于一些英语能力不好的同学,可能看的会有点头疼,但是只要自己坚持下去,一直看英文文档,遇到不懂的词,可以查一下,这样日积月累,达到从量变到质变的过程,最终再看什么英文文档,都不在话下了。好了,接下来,进入正题。
一、安装RabbitMQ
链接: https://pan.baidu.com/s/1zsNlA1IB0o05AEHNjijxGA 提取码: 2sph
可以在这里下载安装文档
二、6种队列模式
RabbitMQ的官网地址是http://www.rabbitmq.com,进入官网教程(RabbitMQ Tutorials),看到有6个模式:
1、简单的队列模式

这个图是官网上面的图,P是生产者,是发送消息的一方,C是消费者,是接收消息的一方,可以把RabbitMQ和邮局类比,当你想要寄信的时候,你会把信放在邮箱里,然后等待邮差把信收走,然后送到想要寄给的那个人,唯一不同点就是邮局是处理纸质信件,然而RabbitMQ是处理二进制数据。有三个术语,生产者(producer)是发送消息的一方,队列(queue)相当于邮箱,存储消息,消费者(consumer)是接收消息的一方,注意:生产者、消费者、队列可以不再同一台机器上,可以分布在不同的机器,一个应用既可以是生产者,也可以是消费者。
接下来开始用代码实现简单队列模式:
使用maven项目完成这个例子,先在pom.xml文件里添加rabbitmq的依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.4.4</version>
</dependency>
①建一个工具类RabbitMQConnection,方便连接RabbitMQ
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 连接rabbitmq
*/
public class RabbitMQConnection {
private static Connection connection;
static {
ConnectionFactory connectionFactory = new ConnectionFactory();
try{
/**
* rabbitmq服务器
*/
connectionFactory.setHost("127.0.0.1");
/**
* 虚拟主机名
*/
connectionFactory.setVirtualHost("/testRabbitMQ");
/**
* 用户名
*/
connectionFactory.setUsername("renruibin");
/**
* 密码
*/
connectionFactory.setPassword("111111");
/**
* 默认端口
*/
connectionFactory.setPort(5672);
connection = connectionFactory.newConnection();
} catch (Exception e){
e.printStackTrace();
}
}
/**
* 获取连接
* @return
*/
public static Connection getConnection(){
return connection;
}
}
②建立生产者,发送消息
/**
* 生产者
*/
public class SendMessage {
/**
* 定义队列名称
*/
private static final String QUEUE_NAME = "simple_queue";
public static void main(String[] args) throws Exception{
/**
* 1、获取连接,相当于数据库连接
*/
Connection connection = RabbitMQConnection.getConnection();
/**
* 2、创建通道,相当于statement
*/
Channel channel = connection.createChannel();
/**
* 3、声明队列,如果没有,就创建(Declaring a queue is idempotent - it will only be created if it doesn't exist already. )
* 第一个参数:队列名称
* 第二个参数:是否持久化
* 第三个参数:是否排外(设置了排外为true的队列只可以在本次的连接中被访问,新建的连接不可以访问)
* 第四个参数:是否自动删除(当最后一个连接断开时,就会删除)、第五个参数是一些配置
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
/**
* 4、发送消息
* 第一个参数:交换机名字
* 第二个参数:路由key,简单队列模式里,队列的名字就是路由key
* 第三个参数:可以添加额外的配置
* 第四个参数:二进制的消息数据
*/
String message = "第"+new Random().nextInt()+"条消息";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("发送"+message+"成功");
}
}
执行这段程序之后,会发送一条消息到RabbitMQ服务器的队列里,可以访问localhost:15672查看
目前有一个连接

目前有一个管道

目前有一个队列,而且有一条消息等待被消费

③建立消费者,去消费这条消息
public class ReceiveMessage {
/**
* 定义队列名称
*/
private static final String QUEUE_NAME = "simple_queue";
public static void main(String[] args) throws Exception{
/**
* 1、获取连接,相当于数据库连接
*/
Connection connection = RabbitMQConnection.getConnection();
/**
* 2、创建通道,相当于statement
*/
Channel channel = connection.createChannel();
/**
* 3、声明队列,如果没有,就创建(Declaring a queue is idempotent - it will only be created if it doesn't exist already. )
* 第一个参数:队列名称
* 第二个参数:是否持久化
* 第三个参数:是否排外(设置了排外为true的队列只可以在本次的连接中被访问,新建的连接不可以访问)
* 第四个参数:是否自动删除(当最后一个连接断开时,就会删除)、第五个参数是一些配置
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
/**
* 4、定义队列的消费者
*/
QueueingConsumer consumer = new QueueingConsumer(channel);
/**
* 5、监听队列,设置成自动确认接收成功
* 第一个参数:队列名称
* 第二个参数:是否自动ACK
* 第三个参数:消费者
*/
channel.basicConsume(QUEUE_NAME,true,consumer);
/**
* 6、开始消费数据
*/
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
System.out.println("收到"+new String(delivery.getBody())+"成功");
}
}
