概述

Redis不仅可作为缓存服务器,还可用作消息队列,本示例演示如何使用redis实现发布/订阅消息队列。

  • 在Redis中,发布者没有将消息发送给特定订阅者的程序。相反,发布的消息被描述为通道,而不知道(如果有的话)可能有哪些订阅者。
  • 订阅者表示对一个或多个主题感兴趣,只接收感兴趣的消息,而不知道(如果有的话)发布者是什么。
  • 发布者和订阅者的这种解耦可以实现更大的可伸缩性和更动态的网络拓扑。

代码实现

redis实现mq的存储方式很多,可以使用list,zset及stream,这些数据的存储结构决定了怎么消费问题(消息是一次使用、允许多次使用、允许多端消息等),比如使用list,我们可以使用leftPush插入消息,使用rightPop消费消息,实现一条消息一次消息,可以参考与以示例代码:

    @Test     public void testMq() {         for (int i = 0; i < 10; i++) {             redisTemplate.opsForList().leftPush("task-queue", "data" + i);             log.info("插入了一个新的任务==>{}", "data" + i);         }         String taskId = redisTemplate.opsForList().rightPop("task-queue").toString();         log.info("处理成功,清除任务==>{}", taskId);     }

1.配置代码RedisConfig.java

package demo.data.mqRedis.config;  import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cache.annotation.CachingConfigurerSupport; import org.springframework.cache.annotation.EnableCaching; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.listener.ChannelTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer;  @Configuration @EnableCaching public class RedisConfig extends CachingConfigurerSupport {      @Autowired     private RedisTemplate redisTemplate;      /**      * redisTemplate 序列化使用的jdkSerializeable, 存储二进制字节码, 所以自定义序列化类,方便调试redis      *      * @param redisConnectionFactory      * @return      */     @Bean     public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {          RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();          //使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值         redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());         redisTemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());          //使用StringRedisSerializer来序列化和反序列化redis的ke         redisTemplate.setKeySerializer(new StringRedisSerializer());         redisTemplate.setHashKeySerializer(new StringRedisSerializer());          //开启事务         redisTemplate.setEnableTransactionSupport(true);          redisTemplate.setConnectionFactory(redisConnectionFactory);          return redisTemplate;     }      @Bean     MessageListenerAdapter messageListener() {         return new MessageListenerAdapter(new RedisMessageSubscriber());     }      @Bean     RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,                                             MessageListenerAdapter listenerAdapter) {          RedisMessageListenerContainer container = new RedisMessageListenerContainer();         container.setConnectionFactory(connectionFactory);         container.addMessageListener(listenerAdapter, topic());          return container;     }      @Bean     MessagePublisher redisPublisher() {         return new RedisMessagePublisher(redisTemplate, topic());     }      @Bean     ChannelTopic topic() {         return new ChannelTopic(