参考程序员DD大佬的文章,自己新建demo学习学习,由于需要消息回执,看到了@SendTo这个注解能够实现,下面开始学习demo,新建两个项目cloud-stream-consumer消费端 和 cloud-stream-consumer 生产端 public interface StreamReceive { @Input("MQRece") SubscribableChannel mqReceive(); } 添加一个StreamReceive接口,定义@input通道 @Component @Slf4j public class ReceiveListener { @StreamListener("MQRece") public byte[] receive(byte[] bytes){ log.info("接受消息:"+new String(bytes)); return "ok".getBytes(); } } 添加消息监听,接受消息定义为byte[] 添加application.properties配置文件信息 spring.cloud.stream.rocketmq.binder.namesrv-addr= 192.168.211.11:9876 spring.cloud.stream.bindings.MQRece.destination=message-topic spring.cloud.stream.bindings.MQRece.group=rece-group server.port=19999 为MQRece通道添加主题message-topic,组名rece-group 到此Stream 客户端消费就完成了,本节需要把@SendTo注解用起来,需要新建一个MessageChannel进行产生消息 public interface MsgBackPush { @Output("back-push") MessageChannel backPush(); } 然后在ReceiveListener添加@SendTo @Component @Slf4j public class ReceiveListener { @StreamListener("MQRece") @SendTo("back-push") public byte[] receive(byte[] bytes){ log.info("接受消息:"+new String(bytes)); return "ok".getBytes(); } } 新增通道配置application.properties spring.cloud.stream.bindings.back-push.destination=back-topic spring.cloud.stream.bindings.back-push.group=back-group SpringBoot启动类记得添加 @EnableBinding(value = {StreamReceive.class,MsgBackPush.class}) @SpringBootApplication @EnableBinding(value = {StreamReceive.class,MsgBackPush.class}) public class CloudStreamConsumerApplication { public static void main(String[] args) { SpringApplication.run(CloudStreamConsumerApplication.class, args); } } 到此,cloud-stream-consumer这个demo就完成了 接下来看看 cloud-stream-producer public interface StreamPush { @Output("MQPush") MessageChannel mqPush(); } 定义一个通道名为MQPush,进行消息生产 public interface ProducerReceive { @Input("producer-receive") SubscribableChannel producerReceive(); } 定义一个通道名为producer-receive,进行回执消息的消费 @Component @Slf4j public class ProducerListener { @StreamListener("producer-receive") public void producerReceive(byte[] bytes){ log.info("come back message:"+new String(bytes)); } } 具体回执消息处理逻辑,再来看看application.properties spring.cloud.stream.rocketmq.binder.namesrv-addr= 192.168.214.191:9876 spring.cloud.stream.bindings.MQPush.destination=message-topic spring.cloud.stream.bindings.MQPush.group=push-group spring.cloud.stream.bindings.producer-receive.destination=back-topic spring.cloud.stream.bindings.producer-receive.group=back-group server.port=20000 为通道设置topic和group,新建一个Http接口测试一下成果 @SpringBootApplication @EnableBinding(value = {StreamPush.class,ProducerReceive.class}) @RestController public class CloudStreamProducerApplication { public static void main(String[] args) { SpringApplication.run(CloudStreamProducerApplication.class, args); } @Autowired private StreamPush streamPush; @GetMapping("/sendMessage") public String sendMessage(){ streamPush.mqPush().send(MessageBuilder.withPayload("message body".getBytes()).build()); return "ok"; } } 访问http://localhost:20000/sendMessage,结果图如下 cloud-stream-consumer日志输出 file cloud-stream-producer日志输出 file 学习@ServiceActivator这个注解,上面的项目cloud-stream-consumer ReceiveListener类中添加 @Component @Slf4j public class ReceiveListener { @StreamListener("MQRece") @SendTo("back-push") public byte[] receive(byte[] bytes){ log.info("接受消息:"+new String(bytes)); // 抛出异常 if(1==1){ throw new RuntimeException("Message consumer failed!"); } return "ok".getBytes(); } @Autowired private MsgBackPush msgBackPush; @ServiceActivator(inputChannel = "message-topic.rece-group.errors") public void error(Message message){ log.info("消费者消费消息失败:"+message); msgBackPush.backPush().send(MessageBuilder.withPayload("消息消费失败".getBytes()).build()); } } 通过使用@ServiceActivator(inputChannel = "test-topic.stream-exception-handler.errors")指定了某个通道的错误处理映射。其中,inputChannel的配置中对应关系如下: message-topic:消息通道对应的目标(destination,即:spring.cloud.stream.bindings.MQRece.destination的配置) rece-group:消息通道对应的消费组(group,即:spring.cloud.stream.bindings.MQRece.group的配置) 访问http://localhost:20000/sendMessage,结果图如下 cloud-stream-consumer日志输出 file cloud-stream-producer日志输出 file 个人联系方式QQ:944484545,欢https://www.cnblogs.com/hy-xiaobin/p/12175120.html