Redis的发布订阅模式
发布订阅(Pub/Sub):目前广泛使用的通信模型,它采用事件作为基本的通信机制,提供大规模系统所要求的松散耦合的交互模式:订阅者(如客户端)以事件订阅的方式表达出它有兴趣接收的一个事件或一类事件;发布者(如服务器)可将订阅者感兴趣的事件随时通知相关订阅者,其就是基于观察者模式设计的.
适用场景
(1).优惠券到期到期失效
(2).微信粉丝关注公众号
(3).消息及时通讯(redis不像中间件一样,不会对发布的消息进行缓存,确认,重试等机制)
redis发布订阅模式案例
1.先创建订阅者,并为订阅者设置订阅主题
2.创建发布者,向指定主题发送消息
3.消息订阅者接收到发布者发布的消息
注:对比RabbitMQ的消息订阅者模式,两者异同如下:
Redis:轻量级,低延迟,高并发,低可靠性;
RabbitMQ:重量级,高可靠,异步,不保证实时;
RabbitMQ是一个专门的 AMQP 协议队列,他的优势就在于提供可靠的队列服务,并且可做到异步,而Redis主要是用于缓存的,Redis 的发布订阅模式,可用于实现及时性,且可靠性低的功能。
springboot整合Redis的发布订阅功能
1.导入pom依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency>
本人测试优惠券场景时引入的完整pom
<?xml version="1.0" encoding="UTF-8"?><project xmlns="/POM/4.0.0"xmlns:xsi="/2001/XMLSchema-instance"xsi:schemaLocation="/POM/4.0.0 /xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.tedu</groupId><artifactId>redissubandpub</artifactId><version>1.0-SNAPSHOT</version><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.2.2.RELEASE</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>2.2.0</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><version>2.5.4</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.4</version><scope>provided</scope></dependency></dependencies></project>
2.application.yml配置(其他的可以结合mysql进行测试的相关配置)
server:port: 8080spring:#引入数据库配置datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/coupon?useUnicode=true&charaterEncoding=utf8&serverTimezone=UTCusername: rootpassword: root#配置mybatis映射文件mybatis: mapper-locations: classpath:/mappers/*.xmltype-aliases-package: com.tedu.scheduletask.entityconfiguration:map-underscore-to-camel-case: trueredis:host: 127.0.0.1port: 6379database: 0jedis:pool:# 连接池最大连接数(使用负值表示没有限制)max-active: 8# 连接池最大阻塞等待时间(使用负值表示没有限制)max-wait: -1# 连接池中的最大空闲连接max-idle: 8# 连接池中的最小空闲连接min-idle: 0timeout: 30000logging:level: com:tedu:scheduletask:mapper: debug
3.创建redis消息订阅配置类
@Configurationpublic class RedisConfig {@BeanRedisMessageListenerContainer container (RedisConnectionFactory redisConnectionFactory, MessageListenerAdapter listenerAdapter){RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(redisConnectionFactory);// 可以添加多个 messageListener,配置不同的交换机container.addMessageListener(listenerAdapter,new PatternTopic("msg"));container.addMessageListener(listenerAdapter,new ChannelTopic("__keyevent@0__:expired"));return container;}//消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法@BeanMessageListenerAdapter listenerAdapter(RedisReceiver receiver) {return new MessageListenerAdapter(receiver, "receiveMessage");}@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {RedisTemplate<String, Object> template = new RedisTemplate();//连接工厂template.setConnectionFactory(redisConnectionFactory);//序列化配置Jackson2JsonRedisSerializer objectJackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);ObjectMapper objectMapper = new ObjectMapper();objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);objectJackson2JsonRedisSerializer.setObjectMapper(objectMapper);StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();//配置具体序列化//key采用string的序列化方式template.setKeySerializer(stringRedisSerializer);//hash的key采用string的序列化方式template.setHashKeySerializer(stringRedisSerializer);//value序列化采用jacksontemplate.setValueSerializer(objectJackson2JsonRedisSerializer);//hash的value序列化采用jacksontemplate.setHashValueSerializer(objectJackson2JsonRedisSerializer);template.afterPropertiesSet();return template;}}
此处贴上我测试时的实体类
@Datapublic class Coupon{@Idprivate Long id;//主键idprivate String name;//优惠券名称private BigDecimal money;//优惠券金额private String couponDesc;//说明private Date createTime;//获取时间private Date expireTime;//失效时间private Integer state;//状态 0:可用 1:已失效 2:已使用}
4.PatternTopic方法匹配模式
(1).订阅者监听消息
@Componentpublic class RedisReceiver {public void receiveMessage(String message) {System.out.println("subscribe receive message:"+message);}}
(2)发布者发布消息
@Componentpublic class MessageSend {@Autowiredprivate RedisTemplate redisTemplate;@Scheduled(fixedRate = 3000)//定时发送public void sendMessage(){for (int i = 0; i < 5; i++) {redisTemplate.convertAndSend("msg",String.valueOf(new Random().nextInt(100)));}}}
(3)运行程序,控制台输出结果