最近在做毕业设计,需要用到 Websocket 实现消息的推送。
在 Spring 官网上查看文档,发现 Spring 对 Websocket 的功能进行了太多扩展,引入了 STOMP 和 Broker 的概念(https://spring.io/guides/gs/messaging-stomp-websocket/ )。
当然,我们还需要根据自己项目的需要修改相应的代码。
创建配置类
在Websocket建立连接之前,会先通过HTTP协议进行握手。
如果要拒绝未认证的用户用Websocket连接到服务器,可以新建一个拦截器并在beforeHandshake方法内对用户进行认证
加入Spring 提供的 HttpSessionHandshakeInterceptor拦截器可以将HttpSession的属性复制到WebsocketSession中,我们可以用这个方法识别用户。
首先,创建一个WebSocketConfig.class 文件,作为Websocket的配置类,文件内容如下。
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Autowired
MyHandler myHandler;
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry
.addHandler(myHandler, "/myHandler") // 指定处理的 websocket 路径 /myHandler
//.addInterceptors() 添加你自己的拦截器
//.addInterceptors(new HttpSessionHandshakeInterceptor())
.setAllowedOrigins("*"); // 设置允许连接的源,正式上线话要做调整
}
}
处理消息
创建 MyHandler.class 文件,用来处理Websocket请求,因为我们目前只需要处理文本内容,因此继承了TextWebSocketHandler。
如果要处理二进制内容,可以覆盖handleBinaryMessage方法或去继承TextWebSocketHandler的父类AbstractWebSocketHandler
@Component
public class MyHandler extends TextWebSocketHandler {
@Autowired
WebSocketService service;
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
// 连接建立之后
super.afterConnectionEstablished(session);
service.addWebSocketSession(session);
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
// 连接关闭之后
super.afterConnectionClosed(session, status);
service.removeWebSocketSession(session);
}
@Override
public void handleTextMessage(WebSocketSession session, TextMessage message) {
// 处理另一端发送过来的消息
logger.debug("id: " +session.getId());
logger.debug("attributes: " + session.getAttributes());
logger.debug("message: " + message.getPayload());
service.handleTextMessage(session, message)
}
}
创建WebsocketService类,进行业务处理
@Service
public class WebSocketService {
// 用synchronized或ConcurrentHashMap保证多线程安全,因为我们还需要实现广播,所以这里我用了synchronized实现线程安全
Map sessionMap = new HashMap<>();
public synchronized void addWebSocketSession(WebSocketSession session) {
String id = "";// 从session中获取的唯一标识
// session 中还可以获得其他握手前的http各种属性,如URL,请求头,这些也可以作为唯一标识
sessionMap.put(id, session); //保存会话
}
public synchronized void removeWebSocketSession(WebSocketSession session) {
String id = "";
sessionMap.remove(id); // 删除会话
}
public synchronized void broadcast(String message) {
for(WebSocketSession session: sessionMap.values()) {
TextMessage tm = new TextMessage("" /*你要广播的数据*/);
try {
session.sendMessage(tm);
} catch (IOException e) {
// 发送失败后,需要继续给其他人广播,因此在循环里面用 try 捕获异常
}
}
}
public synchronized void unicast(String id, String message) {
WebSocketSession session = sessionMap.get(id);
TextMessage tm = new TextMessage("" /*你要发送的数据*/);
try {
session.sendMessage(tm);
} catch (IOException e) {
}
}
public synchronized void handleTextMessage(WebSocketSession session, TextMessage message) {
// 处理另一端发送过来的消息
}
}