1.基本介绍
Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。
Redis 客户端可以订阅任意数量的频道。
publish(发布命令)
API:publish channel message #发布一条消息到制定频道
演示:
127.0.0.1:6379> publish cctv:5 "hello world"(integer) 0 #订阅者个数
subscribe(订阅)
API:subscribe [channel] #订阅一个或多个频道
演示:
开启两个客户端,一个客户端先订阅cctv:5这个频道,另一个客户端往这个频道发送消息,订阅者会接收到消息。
unsubcribe(取消订阅)
API:unsubcribe [channel] #取消订阅一个或多个频道
演示:
127.0.0.1:6379> unsubscribe cctv:51) "unsubscribe"2) "cctv:5"3) (integer) 0
其它常用API
psubscribe [pattern...] #订阅模式punsubscribe [pattern...] #退订制定的模式pubsub channels #列出至少有一个订阅者的模式pubsub numsub [channel...] #列出给定频道的订阅者数量pubsub numpat #列出被订阅模式的数量
2.SpringBoot实现redis的订阅模式
(1)导入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>
(2)配置redis
spring:redis:host: localhostport: 6379
(3)redis监听配置
@Configuration@AutoConfigureAfter({RedisReceiver.class})public class RedisListenerConfig {/*** redis消息监听器容器* 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器* 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理** @param connectionFactory* @param listenerAdapter* @return*/@BeanRedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,MessageListenerAdapter listenerAdapter) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);//可以添加多个 messageListenercontainer.addMessageListener(listenerAdapter, new PatternTopic("test"));return container;}/*** 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法** @param redisReceiver* @return*/@BeanMessageListenerAdapter listenerAdapter(RedisReceiver redisReceiver) {System.out.println("消息适配器进来了");return new MessageListenerAdapter(redisReceiver, "receiveMessage");}/*** 使用默认的工厂初始化redis操作模板** @param connectionFactory* @return*/@BeanStringRedisTemplate template(RedisConnectionFactory connectionFactory) {return new StringRedisTemplate(connectionFactory);}}
(4)消息处理类
@Componentpublic class RedisReceiver {/*** 这里是收到通道的消息之后执行的方法** @param message*/public void receiveMessage(String message) {System.out.println("消息来了:" + message);}}
(5)测试类
@EnableScheduling@Componentpublic class TestController {@Autowiredprivate StringRedisTemplate stringRedisTemplate;/*** 向redis消息队列test通道发布消息*/@Scheduled(fixedRate = 1000)public void sendMessage() {stringRedisTemplate.convertAndSend("test", String.valueOf(Math.random()));}}
测试结果如下:
消息来了:0.07573790224258636消息来了:0.6782217218490485消息来了:0.0427400486474消息来了:0.01991680494899495消息来了:0.21214175193928786消息来了:0.8152357687137979
(6)实现一个方法监听多个频道
@BeanRedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,MessageListenerAdapter listenerAdapter) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);//可以添加多个 messageListenercontainer.addMessageListener(listenerAdapter, new PatternTopic("test"));container.addMessageListener(listenerAdapter, new PatternTopic("test2"));return container;}
这样之前的receiveMessage方法会接收到来自test和test2两个频道的所有消息。
(7)实现不同方法监听不同频道
修改我们的配置类,增加消息的适配器参数,之前只有一个适配器参数,现在我们增加到两个。并把test3的频道交给第二个适配器接收。
@BeanRedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,MessageListenerAdapter listenerAdapter,MessageListenerAdapter listenerAdapter2) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);//可以添加多个 messageListenercontainer.addMessageListener(listenerAdapter, new PatternTopic("test"));container.addMessageListener(listenerAdapter, new PatternTopic("test2"));container.addMessageListener(listenerAdapter2, new PatternTopic("test3"));return container;}@BeanMessageListenerAdapter listenerAdapter(RedisReceiver redisReceiver) {System.out.println("消息适配器进来了");return new MessageListenerAdapter(redisReceiver, "receiveMessage");}@BeanMessageListenerAdapter listenerAdapter2(RedisReceiver redisReceiver) {System.out.println("消息适配器2进来了");return new MessageListenerAdapter(redisReceiver, "receiveMessage2");}
测试方法:
@EnableScheduling@Componentpublic class TestController {@Autowiredprivate StringRedisTemplate stringRedisTemplate;/*** 向redis消息队列test通道发布消息*/@Scheduled(fixedRate = 1000)public void sendMessage() {stringRedisTemplate.convertAndSend("test", String.valueOf(Math.random()));stringRedisTemplate.convertAndSend("test2", "hello world");}@Scheduled(fixedRate = 1000)public void sendMessage2() {stringRedisTemplate.convertAndSend("test3", "hello redis");}}
测试结果如下:
消息来了:0.21550984623295588消息来了:hello world消息来了2:hello redis消息来了:0.4472284248403804消息来了:hello world消息来了2:hello redis消息来了:0.3441512587764187消息来了:hello world消息来了2:hello redis