700字范文,内容丰富有趣,生活中的好帮手!
700字范文 > 利用Redis发布订阅模式 SSE实现分布式实时站内信系统

利用Redis发布订阅模式 SSE实现分布式实时站内信系统

时间:2018-10-26 00:47:29

相关推荐

利用Redis发布订阅模式 SSE实现分布式实时站内信系统

文章目录

前言一、SSE是什么?二、单机与集群的站内信实现方式有何区别?三、Redis 发布、订阅模式有何特点?四、代码演示1.数据模型小结 2.引入依赖3.配置RedissonClient4.编写RestController用于测试5.编写用户消息订阅逻辑6.实现消息的发布7.实现广播消息的订阅8.个人与团队消息的订阅8.效果演示 总结

前言

站内信功能在各大系统中被广泛应用,本文结合工作的实际场景,使用java springboot框架、Redis,探讨一种轻量化的实现分布式实时站内信系统方案。


一、SSE是什么?

SSE (Server-sent Events )是 WebSocket 的一种轻量代替方案,可实现服务端主动向客户端推送消息,例如在扫码支付结果反馈、邮箱服务的新邮件提醒,微博的新消息推送,SSE 都是不错的选择

在Springboot框架中已经集成,无需像websocket方式一样需要单独引入依赖,相对轻量。

浏览器断线自动重连,减少了前端业务逻辑。

二、单机与集群的站内信实现方式有何区别?

单机场景下逻辑较为简单,消息的发送端和用户登录在同一个节点,程序只需要在内存中找到所需的SseEmitter对象并发送即可。但生产环境往往不会只部署一个节点,那此时用户订阅的SseEmitter对象和消息的发送端很可能不在同一个节点,假设用户订阅产生的SseEmitter对象在A节点,但是消息发送在B节点,如何让A节点感知到有实时消息并调用SseEmitter.send()发送数据,需要一套中心化的消息推送机制才能实现。

三、Redis 发布、订阅模式有何特点?

Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。

下图展示了客户端与频道之间的订阅关系

下图展示了消息推送数据流向

相对于市面上其他的主流的消息中间件产品,如Kafka、RabbitMQ、ActiveMQ, RocketMQ,Redis的订阅发布功能相对轻量,但由于其数据只在内存中分发,不持久化的特点,当客户端程序重启时可能会发生丢失在重启期间的数据的问题,所以Redis的发布订阅功能,只针对业务体量小、追求轻量化、对数据完整性要求不高的场景下比较适合使用。

四、代码演示

1.数据模型

消息发布的范围分为全体广播、工作室广播、个人私信

全体广播:所有用户都能收到消息

工作室广播:仅指定工作室内的成员能够接收到消息

个人私信:仅个人能收到消息

用户表:

用户与工作室关系表:

具体数据可以下载源码启动后访问:http://127.0.0.1:9090/h2

小结

用户zhangsan、lisi同属团队1,wangwu属于团队2

2.引入依赖

Redission:

<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.17.3</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>

3.配置RedissonClient

@Beanpublic RedissonClient redissonClient() {Config config = new Config();// 单节点SingleServerConfig singleServerConfig = config.useSingleServer();singleServerConfig.setAddress("redis://127.0.0.1:6379");// 使用json序列化方式ObjectMapper objectMapper = new ObjectMapper();objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);objectMapper.registerModule(new JavaTimeModule());objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);config.setCodec(new JsonJacksonCodec(objectMapper));return Redisson.create(config);}

4.编写RestController用于测试

@Slf4j@RequiredArgsConstructor@RestControllerpublic class InboxController {private final SseEmitterServer sseEmitterServer;private final IMsgSendService msgSendService;/*** 模拟用户上线,订阅自己的站内信,服务端返回SseEmitter对象** @return SseEmitter对象*/@GetMapping("/online/{userId}")public SseEmitter online(@PathVariable String userId) throws IOException {return sseEmitterServer.connect(userId);}/*** 模拟消息发布接口** @param publishDTO 消息体* @return 消息发送结果*/@PostMapping("/publish")public Result<Void> publish(@RequestBody PublishDTO publishDTO) {msgSendService.publish(publishDTO);return Result.success();}}

5.编写用户消息订阅逻辑

@Slf4j@Component@RequiredArgsConstructorpublic class SseEmitterServer {public final static String INBOX_USER_TOPIC_PREFIX = "inbox_topic_user_";private final RedissonClient redissonClient;public static Map<String, SseEmitter> onlineUserEmitters = new HashMap<>(100);private final InboxUserTopicListener inboxTopicUserMessageListener;/*** 创建用户连接并返回 SseEmitter** @return SseEmitter*/public SseEmitter connect(String userId) throws IOException {// 为方便演示,假如同一个用户多终端同时在线,仅保留最后一个终端if (onlineUserEmitters.containsKey(userId)) {SseEmitter emitter = onlineUserEmitters.remove(userId);try {emitter.send("您已被顶下线...");} catch (IOException e) {log.info("该用户可能已下线");} finally {plete();}}// 设置超时时间(和token有效期一致,超时后不再推送消息),0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutExceptionSseEmitterUTF8 sseEmitter = new SseEmitterUTF8(0L);// 注册回调sseEmitter.onCompletion(completionCallBack(userId));sseEmitter.onError(errorCallBack(userId));sseEmitter.onTimeout(timeoutCallBack(userId));onlineUserEmitters.put(userId, sseEmitter);// 订阅个人站内消息RTopic topic = redissonClient.getTopic(INBOX_USER_TOPIC_PREFIX + userId);// 仅首次订阅时需要增加监听器,否则容器内会存在多个监听器,一条消息用户会收到数次if (topic.countListeners() == 0) {topic.addListener(PublishDTO.class, inboxTopicUserMessageListener);}sseEmitter.send(String.format("%s号用户,欢迎登录!", userId));return sseEmitter;}private Runnable completionCallBack(String userId) {return () -> {log.info("用户 {} 连接结束", userId);onlineUserEmitters.remove(userId);};}private Runnable timeoutCallBack(String userId) {return () -> {log.info("用户 {} 连接超时", userId);onlineUserEmitters.remove(userId);};}private Consumer<Throwable> errorCallBack(String userId) {return throwable -> {log.info("用户 {} 连接异常", userId);onlineUserEmitters.remove(userId);};}}

6.实现消息的发布

@Service@RequiredArgsConstructorpublic class MsgSendServiceImpl implements IMsgSendService {private final RedissonClient redissonClient;private final IUserStudioRelationService userStudioRelationService;private final IInboxMsgService inboxMsgService;@Overridepublic void publish(PublishDTO publishDTO) {// 持久化数据到db(由于Redis的发布订阅模式,可能存在消息丢失风险,且实时站内信只会发送给在线的用户,// 未在线的用户登录后也需要看到离线期间的站内信,以及站内信管理功能的需要,所以站内信数据需要持久化)inboxMsgService.save(convert(publishDTO));Scope scope = publishDTO.getScope();if (scope == Scope.EVERYONE) {publishDTO.setTitle("广播消息: " + publishDTO.getTitle());// 向所有在线用户发送实时站内信redissonClient.getTopic("inbox_broadcast_msg_topic").publish(publishDTO);} else if (scope == Scope.STUDIO) {publishDTO.setTitle("工作室消息: " + publishDTO.getTitle());// 查询工作室下所有用户列表,发送信息到每个用户List<String> userIds = userStudioRelationService.list(new LambdaQueryWrapper<UserStudioRelationDO>().eq(UserStudioRelationDO::getStudioId, publishDTO.getTarget())).stream().map(UserStudioRelationDO::getUserId).collect(Collectors.toList());for (String userId : userIds) {// 将目标设置为用户id,在InboxUserTopicListener内用到publishDTO.setTarget(userId);redissonClient.getTopic("inbox_topic_user_" + userId).publish(publishDTO);}} else if (scope == Scope.USER) {publishDTO.setTitle("个人消息: " + publishDTO.getTitle());redissonClient.getTopic("inbox_topic_user_" + publishDTO.getTarget()).publish(publishDTO);} else {throw new IllegalArgumentException("不支持的发送范围");}}private InboxMsgDO convert(PublishDTO publishDTO) {InboxMsgDO inboxMsgDO = new InboxMsgDO();inboxMsgDO.setTitle(publishDTO.getTitle());inboxMsgDO.setContent(publishDTO.getContent());inboxMsgDO.setScope(publishDTO.getScope());inboxMsgDO.setTargetId(publishDTO.getTarget());return inboxMsgDO;}}

7.实现广播消息的订阅

@Slf4j@Component@RequiredArgsConstructorpublic class InboxBroadcastTopicListener implements MessageListener<PublishDTO> {private final RedissonClient redissonClient;@PostConstructpublic void subscribe() {// 不同于个人消息需要订阅指定的topic,单个服务实例仅需订阅1个inbox_broadcast_msg_topic,即可实现广播消息的订阅redissonClient.getTopic("inbox_broadcast_msg_topic").addListener(PublishDTO.class, this);log.info("订阅广播消息..");}/*** 监听到inbox_broadcast_msg_topic topic下有数据产生会进入此方法* @param channel* @param msg*/@Overridepublic void onMessage(CharSequence channel, PublishDTO msg) {// 遍历当前实例下所有的在线用户,为其广播消息for (Map.Entry<String, SseEmitter> entry : SseEmitterServer.onlineUserEmitters.entrySet()) {String userId = entry.getKey();SseEmitter emitter = entry.getValue();try {emitter.send(msg);} catch (IOException e) {log.info("发送站内信失败:{},用户可能已下线", userId);}}}}

8.个人与团队消息的订阅

@Slf4j@Component@RequiredArgsConstructorpublic class InboxUserTopicListener implements MessageListener<PublishDTO> {@Overridepublic void onMessage(CharSequence channel, PublishDTO msg) {String target = msg.getTarget();// 如果目标发送用户存在于当前实例if (SseEmitterServer.onlineUserEmitters.containsKey(target)) {SseEmitter emitter = SseEmitterServer.onlineUserEmitters.get(target);try {emitter.send(msg);} catch (IOException e) {log.info("发送站内信失败:{},用户可能已下线", target);}}}}

8.效果演示

模拟3位用户同时在线:

模拟向1号用户发送私信:

curl -XPOST http://127.0.0.1:9090/publish -H 'Content-Type:application/json' -d '{"title":"To User 1","content":"To User 1","scope":"USER","target":1}'

模拟向1号工作室发送广播:

curl -XPOST http://127.0.0.1:9090/publish -H 'Content-Type:application/json' -d '{"title":"To Studio 1","content":"To Studio 1","scope":"STUDIO","target":1}'

仅同属1号工作室的用户1和用户2收到站内信

模拟向全体成员发送广播消息:

curl -XPOST http://127.0.0.1:9090/publish -H 'Content-Type:application/json' -d '{"title":"To Everyone","content":"To Everyone","scope":"EVERYONE"}'

总结

文中所述代码仅为简单场景下演示,实际场景会稍微复杂一点,具体效果可以下载源码后自行研究,源码采用H2内存数据库,另仅需配置单机redis即可快速启动。

源码地址:/hongqianli/inbox-demo

H2数据库管理页面:http://127.0.0.1:9090/h2 账号:sa 密码:sa

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。