700字范文,内容丰富有趣,生活中的好帮手!
700字范文 > Spring boot整合Redis实现发布订阅(超详细)

Spring boot整合Redis实现发布订阅(超详细)

时间:2020-07-15 08:39:08

相关推荐

Spring boot整合Redis实现发布订阅(超详细)

Redis发布订阅

基础知识相关命令订阅者/等待接收消息发布者/发送消息订阅者/成功接收消息常用命令汇总原理Spring boot整合redis导入依赖Redis配置消息封装类(MessageDto)Redis配置类测试类订阅方实现一:RedisMessageListener订阅方实现二:PrintMessageReceiverMessageListenerAdapter源码分析

以下是Redis相关笔记总结,方便自己以后复习,同时也希望对大家有所帮助。

基础知识

Redis发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收信息。微信,微博,关注系统

Redis客户端可以订阅任意数量的频道

订阅/发布消息图:

剖析:

1.消息发送者,2.频道,3.消息订阅者

下图展示频道channel1,以及订阅这个频道的三个客户端–client2,client5和client1之间的关系

当有新消息通过PUBLISH命令发送给频道channel1时,这个消息就会被发送给订阅它的三个客户端

相关命令

订阅者/等待接收消息

首先打开 Redis 客户端,然后订阅了一个名为“bbx”的 channel,使用如下命令:

127.0.0.1:6379> SUBSCRIBE bbxReading messages... (press Ctrl-C to quit)1) "subscribe"2) "bbx"3) (integer) 1

使用SUBSCRIBE命令订阅了名为 bbx 的 channel。命令执行后该客户端会出处于等待接收消息的阻塞状态。

发布者/发送消息

下面再启动一个 Redis 客户端,输入如下命令:

127.0.0.1:6379> PUBLISH bbx hello(integer) 1127.0.0.1:6379> PUBLISH bbx world(integer) 1127.0.0.1:6379>

订阅者/成功接收消息

127.0.0.1:6379> SUBSCRIBE bbxReading messages... (press Ctrl-C to quit)1) "subscribe"2) "bbx"3) (integer) 1#等待读取推送消息1) "message"#消息2) "bbx"#频道3) "hello"#消息具体内容1) "message"2) "bbx"3) "world"

常用命令汇总

原理

Redis是使用C实现的,可以通过分析Redis源码里的pubsub.c文件,了解发布和订阅机制的底层实现

Redis通过PUBLISH,SUBSCRIBE和PSUBSCRIBE等命令实现发布和订阅功能

通过SUBSCRIBE命令订阅某频道后,redis-server里维护了一个字典,字典的键就是一个频道,字典的值则是一个链表,链表中保存了所有订阅这个频道的客户端。SUBSCRIBE命令的关键,就是将客户端添加到给定频道的订阅链表中。

通过PUBLISH命令向订阅者发送消息,redis-server会使用给定频道作为键,在它维护的频道字典中查找记录了订阅这个频道的所有客户端的链表,将消息发布给所有订阅者

Pub和Sub从字面上理解就是发布(Publish)和订阅(Subscribe),在redis中,可以设定对某一个key值进行消息发布及消息订阅,当一个key值上进行了消息发布后,所有订阅它的客户端都会收到相应的信息,这一功能最明显的用法就是实时消息系统,比如普通的即时聊天,群聊等功能。

Spring boot整合redis

导入依赖

<!--操作redis--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency>

Redis配置

#SpringBoot 所有的配置类,都有一个自动配置类 RedisAutoConfiguration#自动配置类都每绑定一个properties配置文件 RedisProperties#配置redisspring.redis.host=localhostspring.redis.port=6379# Redis服务器连接密码(默认为空)spring.redis.password=*****#默认是数据库0spring.redis.database= 0# 连接池最大连接数(使用负值表示没有限制) 默认 8spring.redis.lettuce.pool.max-active=8# 连接池最大阻塞等待时间(使用负值表示没有限制) 默认 -1spring.redis.lettuce.pool.max-wait=-1# 连接池中的最大空闲连接 默认 8spring.redis.lettuce.pool.max-idle=8# 连接池中的最小空闲连接 默认 0spring.redis.lettuce.pool.min-idle=0

消息封装类(MessageDto)

@AllArgsConstructor@NoArgsConstructor@Datapublic class MessageDto implements Serializable {private String data;private String title;private String content;}

Redis配置类

@Configurationpublic class RedisConfig {//编写配置类,可模仿RedisAutoConfiguration配置类,该类在开发中可直接使用@Bean@SuppressWarnings("all")public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {//由于源码autoConfig中是<Object, Object>,开发中一般直接使用<String,Object>RedisTemplate<String, Object> template = new RedisTemplate();template.setConnectionFactory(factory);//Json序列化配置Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);ObjectMapper om = new ObjectMapper();om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);jackson2JsonRedisSerializer.setObjectMapper(om);//String的序列化StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();//key采用string的序列化template.setKeySerializer(stringRedisSerializer);//hash的key采用string的序列化template.setHashKeySerializer(stringRedisSerializer);//value序列化采用jacksontemplate.setValueSerializer(jackson2JsonRedisSerializer);//hash的value序列化方式采用jacksontemplate.setHashValueSerializer(jackson2JsonRedisSerializer);template.afterPropertiesSet();return template;}/*** Redis消息监听器容器* 这个容器加载了RedisConnectionFactory和消息监听器* 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器* 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理** @param redisConnectionFactory 连接工厂* @param adapter适配器* @return redis消息监听容器*/@Bean@SuppressWarnings("all")public RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory,RedisMessageListener listener,MessageListenerAdapter adapter) {final String TOPIC_NAME1 = "TEST_TOPIC1"; // 订阅主题final String TOPIC_NAME2 = "TEST_TOPIC2"; // 订阅主题RedisMessageListenerContainer container = new RedisMessageListenerContainer();// 监听所有库的key过期事件container.setConnectionFactory(redisConnectionFactory);// 所有的订阅消息,都需要在这里进行注册绑定,new PatternTopic(TOPIC_NAME1)表示发布的主题信息// 可以添加多个 messageListener,配置不同的通道container.addMessageListener(listener, new PatternTopic(TOPIC_NAME1));container.addMessageListener(adapter, new PatternTopic(TOPIC_NAME2));/*** 设置序列化对象* 特别注意:1. 发布的时候需要设置序列化;订阅方也需要设置序列化* 2. 设置序列化对象必须放在[加入消息监听器]这一步后面,否则会导致接收器接收不到消息*/Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class);ObjectMapper objectMapper = new ObjectMapper();objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);seria.setObjectMapper(objectMapper);container.setTopicSerializer(seria);return container;}/*** 这个地方是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“receiveMessage”* 也有好几个重载方法,这边默认调用处理器的方法 叫OnMessage** @param printMessageReceiver* @return*/@Beanpublic MessageListenerAdapter listenerAdapter(PrintMessageReceiver printMessageReceiver) {MessageListenerAdapter receiveMessage = new MessageListenerAdapter(printMessageReceiver, "receiveMessage");Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class);ObjectMapper objectMapper = new ObjectMapper();objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);seria.setObjectMapper(objectMapper);receiveMessage.setSerializer(seria);return receiveMessage;}}

该类中,可以通过调用消息接收容器(container)的 addMessageListener(MessageListener listener, Topic topic) 方法 订阅消息;相反地,也可以调用它的 removeMessageListener(MessageListener listener, Topic topic) 方法 取消订阅消息;在这里我们分别使用两种实现方式去订阅两个不通的频道(channel)。

RedisMessageListener 通过实现MessageListener接口,从而实现该接口中的onMessage(Message message, byte[] pattern)方法。MessageListenerAdapter 通过适配器的方式,自定义一个消息接收类PrintMessageReceiver和接收消息的方法

container.addMessageListener(listener, new PatternTopic(TOPIC_NAME1));container.addMessageListener(adapter, new PatternTopic(TOPIC_NAME2));

分别使用listener去订阅主题TOPIC_NAME1,adapter去订阅TOPIC_NAME2。

接下来分别探讨测试这两种方式。

测试类

@Slf4j@SpringBootTestpublic class RedisMessageTest {@Autowiredprivate RedisUtils redisUtils;@Testpublic void test(){final String TOPIC_NAME1 = "TEST_TOPIC1"; // 订阅主题final String TOPIC_NAME2 = "TEST_TOPIC2"; // 订阅主题// 发布消息MessageDto dto = new MessageDto();LocalDateTime now = LocalDateTime.now();DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");dto.setData(timeFormatter.format(now));dto.setTitle("日常信息");dto.setContent("hello world!");redisUtils.convertAndSend(TOPIC_NAME1, dto);}}

该类中的RedisUtils是之前自己封装的一个工具类,在该类中新增convertAndSend()方法。

RedisUtils中其他方法可跳转此连接查看

/*** 向通道发布消息*/public boolean convertAndSend(String channel, Object message) {if (!StringUtils.hasText(channel)) {return false;}try {redisTemplate.convertAndSend(channel, message);log.info("发送消息成功,channel:{},message:{}", channel, message);return true;} catch (Exception e) {log.info("发送消息失败,channel:{},message:{}", channel, message);e.printStackTrace();}return false;}

订阅方实现一:RedisMessageListener

@Slf4j@Componentpublic class RedisMessageListener implements MessageListener {@Autowiredprivate RedisTemplate redisTemplate;@Overridepublic void onMessage(Message message, byte[] pattern) {// 接收的topiclog.info("channel:" + new String(pattern));//序列化对象(特别注意:发布的时候需要设置序列化;订阅方也需要设置序列化)MessageDto messageDto = (MessageDto) redisTemplate.getValueSerializer().deserialize(message.getBody());log.info(messageDto.getData()+","+messageDto.getContent());}}

对使用RedisMessageListener 进行接收消息测试。

测试结果:

订阅方实现二:PrintMessageReceiver

@Slf4j@Componentpublic class PrintMessageReceiver {@Autowiredprivate RedisTemplate redisTemplate;public void receiveMessage(MessageDto messageDto , String channel) {// 接收的topiclog.info("channel:" + channel);log.info("message:" + messageDto.getTitle());}}

注意:该方法的接收参数类型以及顺序,查阅源码得知,该方法的参数可以是一个(只有消息message),也可是两个(message,channel)并且顺序不能变。

在测试类中将 redisUtils.convertAndSend(TOPIC_NAME1, dto);中的TOPIC_NAME1改为TOPIC_NAME2,

测试结果:

MessageListenerAdapter源码分析

构造函数

public MessageListenerAdapter(Object delegate, String defaultListenerMethod) {this(delegate);setDefaultListenerMethod(defaultListenerMethod);}

其中this()方法中,初始化了序列化方式,该适配器默认的序列化方式是UTF-8的字符串序列化。

2.onMessage()

@Overridepublic void onMessage(Message message, @Nullable byte[] pattern) {try {// Check whether the delegate is a MessageListener impl itself.// In that case, the adapter will simply act as a pass-through.if (delegate != this) {if (delegate instanceof MessageListener) {((MessageListener) delegate).onMessage(message, pattern);return;}}// Regular case: find a handler method reflectively.Object convertedMessage = extractMessage(message);String convertedChannel = stringSerializer.deserialize(pattern);// Invoke the handler method with appropriate arguments.Object[] listenerArguments = new Object[] {convertedMessage, convertedChannel };invokeListenerMethod(invoker.getMethodName(), listenerArguments);} catch (Throwable th) {handleListenerException(th);}}

该方法当订阅频道有消息时默认执行,首先,if(delegate instanceof MessageListener)判断该对象的类是不是实现了MessageListener接口,如果是,就会执行它实现的onMessage()。很显然,我们是自定义的

PrintMessageReceiver 对象,,所以接着往下看。

Object convertedMessage = extractMessage(message);会将message反序列化,如未自定义序列化方式,就会用使用默认的字符串序列化,这就是为什么我们在RedisConfig类中注入listenerAdapter对象时,自定义了Jackson2JsonRedisSerializer 。

3. invokeListenerMethod(invoker.getMethodName(), listenerArguments);

通过反射查找定义对象中处理消息的方法。我们会看到如下的方法实现。

void invoke(Object[] arguments) throws InvocationTargetException, IllegalAccessException {Object[] message = new Object[] {arguments[0] };for (Method m : methods) {Class<?>[] types = m.getParameterTypes();Object[] args = //types.length == 2 //&& types[0].isInstance(arguments[0]) //&& types[1].isInstance(arguments[1]) ? arguments : message;if (!types[0].(args[0])) {continue;}m.invoke(delegate, args);return;}}

从而得知我们自定义方法中参数个数可以是一个也可以是两个,如两个参数时,第一个参数接收消息(message),第二个参数接收频道(channel),也可得知为什么自定义方法中,接收消息参数类型我们可以直接写MessageDto。

以上内容如有不对之处,还望不吝赐教。

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。