Spring Cloud 系列之 Spring Cloud Stream
Spring Cloud Stream 是消息中间件组件,它集成了 kafka 和 rabbitmq 。本篇文章以 Rabbit MQ 为消息中间件系统为基础,介绍 Spring Cloud Stream 的使用。如果你没有用过消息中间件,可以到 RabbitMQ 的官网看一下,或者参考这个 upload/201909241554258578.png" style="margin: 0px; padding: 0px; border: none; max-width: 800px; height: auto;" alt="" />
可能看完了上面的三个概念仍然是一头雾水,没有关系,实践过程中自然就明白了。
先来一个最简单的例子
因为用到的是 rabbitmq,所以在本地搭好 rabbitmq 环境,然后装好 rabbitmq-management 插件,这样就可以访问 web UI 界面了,默认是 15672 端口。
1、引用对应 rabbitmq 的 stream 包
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
2、在 application.yml 中增加配置
spring: profiles: stream-rabbit-customer-group1 cloud: stream: bindings: input: destination: default.messages binder: local_rabbit output: destination: default.messages binder: local_rabbit binders: local_rabbit: type: rabbit environment: spring: rabbitmq: host: localhost port: 32775 username: guest password: guest server: port: 8201
理解配置文件很重要,基本上理解清楚了配置,也就明白 spring cloud stream 是怎么回事了。
spring.cloud.stream.binders
,上面提到了 stream 的 3 个重要概念的第一个 「Destination binders」。上面的配置文件中就配置了一个 binder,命名为 local_rabbit,指定 type 为 rabbit ,表示使用的是 rabbitmq 消息中间件,如果用的是 kafka ,则 type 设置为 kafka。environment 就是设置使用的消息中间件的配置信息,包括 host、port、用户名、密码等。可以设置多了个 binder,适配不同的场景。
spring.cloud.stream.bindings
,对应上面提到到 「Destination Bindings」。这里面可以配置多个 input 或者 output,分别表示消息的接收通道和发送通道,对应到 rabbitmq 上就是不同的 exchange。这个配置文件里定义了两个input 、两个output,名称分别为 input、log_input、output、log_output。这个名称不是乱起的,在我们的程序代码中会用到,用来标示某个方法接收哪个 exchange 或者发送到哪个 exchange 。
每个通道下的 destination 属性指 exchange 的名称,binder 指定在 binders 里设置的 binder,上面配置中指定了 local_rabbit 。
可以看到 input、output 对应的 destination 是相同的,log_input、log_output 对应的 destination 也相同, 也就是对应相同的 exchange。一个表示消息来源,一个表示消息去向。
另外还可以设置 group 。因为服务很可能不止一个实例,如果启动多个实例,那么没必要每个实例都消费同一个消息,只要把功能相同的实例的 group 设置为同一个,那么就会只有一个实例来消费消息,避免重复消费的情况。如果设置了 group,那么 group 名称就会成为 queue 的名称,如果没有设置 group ,那么 queue 就会根据 destination + 随机字符串的方式命名。
3、接下来做一个最简单的例子,来演示如何接收消息。
首先来介绍一下 stream 内置的简单消息通道(消息通道也就是指消息的来源和去向)接口定义,一个 Source 和 一个 Sink 。
Source.java
import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; public interface Source { String OUTPUT = "output"; @Output("output") MessageChannel output(); }
消息发送通道定义,定义了一个 MessageChannel 类型的 output() 方法,用 @Output
注解标示,并指定了 binding 的名称为 output。
Sink.java