模型
生产者
复制代码
1 package cn.wh;
2
3 import java.io.IOException;
4 import java.util.concurrent.TimeoutException;
5
6 import cn.util.RabbitMqConnectionUtil;
7
8 import com.rabbitmq.client.Channel;
9 import com.rabbitmq.client.Connection;
10
11 public class Send {
12
13 private static final String EXCHANGE_NAME="test_exchange_direct";
14
15 public static void main(String[] args) throws IOException, TimeoutException {
16
17
18 Connection connection = RabbitMqConnectionUtil.getConnection();
19
20 Channel channel = connection.createChannel();
21
22 //exchange
23 channel.exchangeDeclare(EXCHANGE_NAME, "direct");
24
25 String msg="hello direct!";
26
27
28 String routingKey="error";
29 channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
30
31 System.out.println("send "+msg);
32
33 channel.close();
34 connection.close();
35 }
36 }
复制代码
消费者
复制代码
1 package cn.wh;
2
3 import java.io.IOException;
4 import java.util.concurrent.TimeoutException;
5
6
7 import cn.util.RabbitMqConnectionUtil;
8 import com.rabbitmq.client.Channel;
9 import com.rabbitmq.client.Connection;
10 import com.rabbitmq.client.Consumer;
11 import com.rabbitmq.client.DefaultConsumer;
12 import com.rabbitmq.client.Envelope;
13 import com.rabbitmq.client.AMQP.BasicProperties;
14
15 public class Recv1 {
16 private static final String EXCHANGE_NAME = "test_exchange_direct";
17 private static final String QUEUE_NAME = "test_queue_direct_1";
18
19 public static void main(String[] args) throws IOException, TimeoutException {
20
21 Connection connection = RabbitMqConnectionUtil.getConnection();
22 final Channel channel = connection.createChannel();
23
24 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
25
26
27 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
28
29 channel.basicQos(1);
30
31 //定义一个消费者
32 Consumer consumer=new DefaultConsumer(channel){
33 //消息到达 触发这个方法
34 @Override
35 public void handleDelivery(String consumerTag, Envelope envelope,
36 BasicProperties properties, byte[] body) throws IOException {
37
38 String msg=new String(body,"utf-8");
39 System.out.println("[1] Recv msg:"+msg);
40
41 try {
42 Thread.sleep(2000);
43 } catch (InterruptedException e) {
44 e.printStackTrace();
45 }finally{
46 System.out.println("[1] done ");
47 channel.basicAck(envelope.getDeliveryTag(), false);
48 }
49 }
50 };
51
52 boolean autoAck=false;//自动应答 false
53 channel.basicConsume(QUEUE_NAME,autoAck , consumer);
54 }
55
56 }
复制代码
消费者2
复制代码
1 package cn.wh;
2
3 import java.io.IOException;
4 import java.util.concurrent.TimeoutException;
5
6
7 import cn.util.RabbitMqConnectionUtil;
8 import com.rabbitmq.client.Channel;
9 import com.rabbitmq.client.Connection;
10 import com.rabbitmq.client.Consumer;
11 import com.rabbitmq.client.DefaultConsumer;
12 import com.rabbitmq.client.Envelope;
13 import com.rabbitmq.client.AMQP.BasicProperties;
14
15 public class Recv2 {
16 private static final String EXCHANGE_NAME = "test_exchange_direct";
17 private static final String QUEUE_NAME = "test_queue_direct_2";
18
19 public static void main(String[] args) throws IOException, TimeoutException {
20
21 Connection connection = RabbitMqConnectionUtil.getConnection();
22 final Channel channel = connection.createChannel();
23
24 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
25
26
27 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
28 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
29 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");
30
31 channel.basicQos(1);
32
33 //定义一个消费者
34 Consumer consumer=new DefaultConsumer(channel){
35 //消息到达 触发这个方法
36 @Override
37 public void handleDelivery(String consumerTag, Envelope envelope,
38 BasicProperties properties, byte[] body) throws IOException {
39
40 String msg=new String(body,"utf-8");
41 System.out.println("[2] Recv msg:"+msg);
42
43 try {
44 Thread.sleep(2000);
45 } catch (InterruptedException e) {
46 e.printStackTrace();
47 }finally{
48 System.out.println("[2] done ");
49 channel.basicAck(envelope.getDeliveryTag(), false);
50 }
51 }
52 };
53
54 boolean autoAck=false;//自动应答 false
55 channel.basicConsume(QUEUE_NAME,autoAck , consumer);
56 }
57
58 }
复制代码
Topic模型
复制代码
1 public class Send {
2 private final static String EXCHANGE_NAME = "test_exchange_topic";
3 public static void main(String[] argv) throws Exception {
4 // 获取到连接以及mq通道
5 Connection connection = ConnectionUtils.getConnection();
6 Channel channel = connection.createChannel();
7 // 声明exchange
8 channel.exchangeDeclare(EXCHANGE_NAME, "topic");
9 // 消息内容
10 String message = "id=1001";
11 channel.basicPublish(EXCHANGE_NAME, "item.delete", null, message.getBytes());
12 System.out.println(" [x] Sent '" + message + "'");
13 channel.close();
14 connection.close();
15 }
16 }
复制代码
消费者
复制代码
1 public class Recv {
2 private final static String QUEUE_NAME = "test_queue_topic_1";
3 private final static String EXCHANGE_NAME = "test_exchange_topic";
4 public static void main(String[] argv) throws Exception {
5 // 获取到连接以及mq通道
6 Connection connection = ConnectionUtils.getConnection();
7 final Channel channel = connection.createChannel();
8 // 声明队列
9 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
10 // 绑定队列到交换机
11 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");
12 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");
13 // 同一时刻服务器只会发一条消息给消费者
14 channel.basicQos(1);
15 // 定义队列的消费者
16 Consumer consumer = new DefaultConsumer(channel) {
17 // 消息到达 触发这个方法
18 @Override
19 public void handleDelivery(String consumerTag, Envelope envelope,
20 BasicProperties properties, byte[] body) throws IOException {
21 String msg = new String(body, "utf-8");
22 System.out.println("[2] Recv msg:" + msg);
23 try {
24 Thread.sleep(1000);
25 } catch (InterruptedException e) {
26 e.printStackTrace();
27 } finally {
28 System.out.println("[2] done ");
29 // 手动回执
30 channel.basicAck(envelope.getDeliveryTag(), false);
31 }
32 }
33 };
34 boolean autoAck = false;
35 channel.basicConsume(QUEUE_NAME, autoAck, consumer);
36 }
37 }
复制代码
消费者2
复制代码
1 public class Recv {
2 private final static String QUEUE_NAME = "test_queue_topic_1";
3 private final static String EXCHANGE_NAME = "test_exchange_topic";
4 public static void main(String[] argv) throws Exception {
5 // 获取到连接以及mq通道
6 Connection connection = ConnectionUtils.getConnection();
7 final Channel channel = connection.createChannel();
8 // 声明队列
9 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
10 // 绑定队列到交换机
11 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");
12 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");
13 // 同一时刻服务器只会发一条消息给消费者
14 channel.basicQos(1);
15 // 定义队列的消费者
16 Consumer consumer = new DefaultConsumer(channel) {
17 // 消息到达 触发这个方法
18 @Override
19 public void handleDelivery(String consumerTag, Envelope envelope,
20 BasicProperties properties, byte[] body) throws IOException {
21 String msg = new String(body, "utf-8");
22 System.out.println("[2] Recv msg:" + msg);
23 try {
24 Thread.sleep(1000);
25 } catch (InterruptedException e) {
26 e.printStackTrace();
27 } finally {
28 System.out.println("[2] done ");
29 // 手动回执
30 channel.basicAck(envelope.getDeliveryTag(), false);
31 }
32 }
33 };
34 boolean autoAck = false;
35 channel.basicConsume(QUEUE_NAME, autoAck, consumer);
36 }
37 }
复制代码
Exchanges(转发器|交换机)
转发器一方面它接受生产者的消息,另一方面向队列推送消息
Nameless exchange(匿名转发)
之前我们对转换器一无所知,却可以将消息发送到队列,那是可能是我们用了默认的转发器,转发器名为空字符串""。之前我们发布消息的代码
Fanout Exchange
不处理路由键。你只需要将队列绑定到交换机上。发送消息到交换机都会被转发到与该交换机绑定的所有队列
Direct Exchange
处理路由键。
需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,不会转发 dog.puppy,也不会转发dog.guard,只会转发 dog。
Topic Exchange
将路由键和某模式进行匹配。
此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”匹配一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”。
作者:HelloWorld
欢迎任何形式的转载,但请务必注明出处。
限于本人水平,如果文章和代码有表述不当之处,还请不吝赐教。https://www.cnblogs.com/wh1520577322/p/10071505.html