Spring Cloud Stream 是消息中间件组件,它集成了 kafka 和 rabbitmq 。本篇文章以 Rabbit MQ 为消息中间件系统为基础,介绍 Spring Cloud Stream 的使用。如果你没有用过消息中间件,可以到 RabbitMQ 的官网看一下,或者参考这个 upload/201909241554258578.png" style="margin: 0px; padding: 0px; border: none; max-width: 800px; height: auto;" alt="" />

可能看完了上面的三个概念仍然是一头雾水,没有关系,实践过程中自然就明白了。

先来一个最简单的例子

因为用到的是 rabbitmq,所以在本地搭好 rabbitmq 环境,然后装好 rabbitmq-management 插件,这样就可以访问 web UI 界面了,默认是 15672 端口。

1、引用对应 rabbitmq 的 stream 包

<dependency>    <groupId>org.springframework.cloud</groupId>     <artifactId>spring-cloud-starter-stream-rabbit</artifactId>  </dependency>

2、在 application.yml 中增加配置

spring:   profiles: stream-rabbit-customer-group1   cloud:     stream:       bindings:         input:           destination: default.messages           binder: local_rabbit         output:           destination: default.messages           binder: local_rabbit       binders:         local_rabbit:           type: rabbit           environment:             spring:               rabbitmq:                 host: localhost                 port: 32775                 username: guest                 password: guest server:   port: 8201

理解配置文件很重要,基本上理解清楚了配置,也就明白 spring cloud stream 是怎么回事了。

spring.cloud.stream.binders,上面提到了 stream 的 3 个重要概念的第一个 「Destination binders」。上面的配置文件中就配置了一个 binder,命名为 local_rabbit,指定 type 为 rabbit ,表示使用的是 rabbitmq 消息中间件,如果用的是 kafka ,则 type 设置为 kafka。environment 就是设置使用的消息中间件的配置信息,包括 host、port、用户名、密码等。可以设置多了个 binder,适配不同的场景。

spring.cloud.stream.bindings ,对应上面提到到 「Destination Bindings」。这里面可以配置多个 input 或者 output,分别表示消息的接收通道和发送通道,对应到 rabbitmq 上就是不同的 exchange。这个配置文件里定义了两个input 、两个output,名称分别为 input、log_input、output、log_output。这个名称不是乱起的,在我们的程序代码中会用到,用来标示某个方法接收哪个 exchange 或者发送到哪个 exchange 。

每个通道下的 destination 属性指 exchange 的名称,binder 指定在 binders 里设置的 binder,上面配置中指定了 local_rabbit 。

可以看到 input、output 对应的 destination 是相同的,log_input、log_output 对应的 destination 也相同, 也就是对应相同的 exchange。一个表示消息来源,一个表示消息去向。

另外还可以设置 group 。因为服务很可能不止一个实例,如果启动多个实例,那么没必要每个实例都消费同一个消息,只要把功能相同的实例的 group 设置为同一个,那么就会只有一个实例来消费消息,避免重复消费的情况。如果设置了 group,那么 group 名称就会成为 queue 的名称,如果没有设置 group ,那么 queue 就会根据 destination + 随机字符串的方式命名。

3、接下来做一个最简单的例子,来演示如何接收消息。

首先来介绍一下 stream 内置的简单消息通道(消息通道也就是指消息的来源和去向)接口定义,一个 Source 和 一个 Sink 。

Source.java

import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel;  public interface Source {     String OUTPUT = "output";      @Output("output")     MessageChannel output(); }

消息发送通道定义,定义了一个 MessageChannel 类型的 output() 方法,用 @Output 注解标示,并指定了 binding 的名称为 output。

Sink.java