700字范文,内容丰富有趣,生活中的好帮手!
700字范文 > SpringBoot实现MQTT消息发送和接收

SpringBoot实现MQTT消息发送和接收

时间:2022-07-13 00:37:50

相关推荐

SpringBoot实现MQTT消息发送和接收

Spring integration交互逻辑

对于发布者:

消息通过消息网关发送出去,由MessageChannel的实例DirectChannel处理发送的细节。DirectChannel收到消息后,内部通过MessageHandler的实例MqttPahoMessageHandler发送到指定的 Topic。

对于订阅者:

通过注入MessageProducerSupport的实例MqttPahoMessageDrivenChannelAdapter,实现订阅 Topic 和绑定消息消费的MessageChannel。同样由MessageChannel的实例DirectChannel处理消费细节。Channel 消息后会发送给我们自定义的MqttInboundMessageHandler实例进行消费。

可以看到整个处理的流程和前面将的基本一致。Spring Integration 就是抽象出了这么一套消息通信的机制,具体的通信细节由它集成的中间件来决定

1、maven依赖

<!-- /artifact/org.springframework.boot/spring-boot-starter-integration --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId><version>2.5.1</version></dependency><!-- /artifact/org.springframework.integration/spring-integration-stream --><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId><version>5.5.5</version></dependency><!-- /artifact/org.springframework.integration/spring-integration-mqtt --><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId><version>5.5.5</version></dependency>

2、yaml配置文件

#mqtt配置mqtt:username: 123password: 123#MQTT-服务器连接地址,如果有多个,用逗号隔开url: tcp://127.0.0.1:1883#MQTT-连接服务器默认客户端IDclient:id: ${random.value}default:#MQTT-默认的消息推送主题,实际可在调用接口时指定topic: topic,mqtt/test/##连接超时completionTimeout: 3000

3、mqtt生产者消费者配置类

import lombok.extern.slf4j.Slf4j;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.integration.annotation.IntegrationComponentScan;import org.springframework.integration.annotation.ServiceActivator;import org.springframework.integration.channel.DirectChannel;import org.springframework.integration.core.MessageProducer;import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;import org.springframework.integration.mqtt.core.MqttPahoClientFactory;import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;import org.springframework.messaging.Message;import org.springframework.messaging.MessageChannel;import org.springframework.messaging.MessageHandler;import org.springframework.messaging.MessagingException;import java.util.Arrays;import java.util.List;/*** mqtt 推送and接收 消息类**/@Configuration@IntegrationComponentScan@Slf4jpublic class MqttSenderAndReceiveConfig {private static final byte[] WILL_DATA;static {WILL_DATA = "offline".getBytes();}@Autowiredprivate MqttReceiveHandle mqttReceiveHandle;@Value("${mqtt.username}")private String username;@Value("${mqtt.password}")private String password;@Value("${mqtt.url}")private String hostUrl;@Value("${mqtt.client.id}")private String clientId;@Value("${mqtt.default.topic}")private String defaultTopic;@Value("${pletionTimeout}")private int completionTimeout; //连接超时/*** MQTT连接器选项**/@Bean(value = "getMqttConnectOptions")public MqttConnectOptions getMqttConnectOptions1() {MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接mqttConnectOptions.setCleanSession(true);// 设置超时时间 单位为秒mqttConnectOptions.setConnectionTimeout(10);mqttConnectOptions.setAutomaticReconnect(true);mqttConnectOptions.setUserName(username);mqttConnectOptions.setPassword(password.toCharArray());mqttConnectOptions.setServerURIs(new String[]{hostUrl});// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制mqttConnectOptions.setKeepAliveInterval(10);// 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。//mqttConnectOptions.setWill("willTopic", WILL_DATA, 2, false);return mqttConnectOptions;}/*** MQTT工厂**/@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();factory.setConnectionOptions(getMqttConnectOptions1());return factory;}/*** MQTT信息通道(生产者)**/@Beanpublic MessageChannel mqttOutboundChannel() {return new DirectChannel();}/*** MQTT消息处理器(生产者)**/@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MessageHandler mqttOutbound() {MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId + "_producer", mqttClientFactory());messageHandler.setAsync(true);messageHandler.setDefaultTopic(defaultTopic);messageHandler.setAsyncEvents(true); // 消息发送和传输完成会有异步的通知回调//设置转换器 发送bytes数据DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();converter.setPayloadAsBytes(true);return messageHandler;}/*** 配置client,监听的topic* MQTT消息订阅绑定(消费者)**/@Beanpublic MessageProducer inbound() {List<String> topicList = Arrays.asList(defaultTopic.trim().split(","));String[] topics = new String[topicList.size()];topicList.toArray(topics);MqttPahoMessageDrivenChannelAdapter adapter =new MqttPahoMessageDrivenChannelAdapter(clientId + "_consumer", mqttClientFactory(), topics);adapter.setCompletionTimeout(completionTimeout);DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();converter.setPayloadAsBytes(true);adapter.setConverter(converter);adapter.setQos(2);adapter.setOutputChannel(mqttInputChannel());return adapter;}/*** MQTT信息通道(消费者)**/@Beanpublic MessageChannel mqttInputChannel() {return new DirectChannel();}/*** MQTT消息处理器(消费者)**/@Bean@ServiceActivator(inputChannel = "mqttInputChannel")public MessageHandler handler() {return new MessageHandler() {@Overridepublic void handleMessage(Message<?> message) throws MessagingException {//处理接收消息mqttReceiveHandle.handle(message);}};}}

4、消息处理类

/*** mqtt客户端消息处理类**/@Slf4j@Componentpublic class MqttReceiveHandle {public void handle(Message<?> message) {log.info("收到订阅消息: {}", message);String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();log.info("消息主题:{}", topic);Object payLoad = message.getPayload();byte[] data = (byte[]) payLoad;Packet packet = Packet.parse(data);log.info("发送的Packet数据{}", JSON.toJSONString(packet));}}

5、mqtt发送接口

import org.springframework.integration.annotation.MessagingGateway;import org.springframework.integration.mqtt.support.MqttHeaders;import org.springframework.messaging.handler.annotation.Header;/*** mqtt发送消息* (defaultRequestChannel = "mqttOutboundChannel" 对应config配置)* **/@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")public interface MqttGateway {/*** 发送信息到MQTT服务器** @param*/void sendToMqttObject(@Header(MqttHeaders.TOPIC) String topic, byte[] payload);/*** 发送信息到MQTT服务器** @param topic 主题* @param payload 消息主体*/void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);/*** 发送信息到MQTT服务器** @param topic 主题* @param qos 对消息处理的几种机制。* 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。* 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。* 2 多了一次去重的动作,确保订阅者收到的消息有一次。* @param payload 消息主体*/void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);/*** 发送信息到MQTT服务器** @param topic 主题* @param payload 消息主体*/void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, Object payload);/*** 发送信息到MQTT服务器** @param topic 主题* @param payload 消息主体*/void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, byte[] payload);}

6、mqtt事件监听类

import lombok.extern.slf4j.Slf4j;import org.springframework.context.event.EventListener;import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;import org.springframework.integration.mqtt.event.MqttMessageDeliveredEvent;import org.springframework.integration.mqtt.event.MqttMessageSentEvent;import org.springframework.integration.mqtt.event.MqttSubscribedEvent;import org.ponent;@Slf4j@Componentpublic class MqttListener {/*** 连接失败的事件通知* @param mqttConnectionFailedEvent*/@EventListener(classes = MqttConnectionFailedEvent.class)public void listenerAction(MqttConnectionFailedEvent mqttConnectionFailedEvent) {log.info("连接失败的事件通知");}/*** 已发送的事件通知* @param mqttMessageSentEvent*/@EventListener(classes = MqttMessageSentEvent.class)public void listenerAction(MqttMessageSentEvent mqttMessageSentEvent) {log.info("已发送的事件通知");}/*** 已传输完成的事件通知* 1.QOS == 0,发送消息后会即可进行此事件回调,因为不需要等待回执* 2.QOS == 1,发送消息后会等待ACK回执,ACK回执后会进行此事件通知* 3.QOS == 2,发送消息后会等待PubRECV回执,知道收到PubCOMP后会进行此事件通知* @param mqttMessageDeliveredEvent*/@EventListener(classes = MqttMessageDeliveredEvent.class)public void listenerAction(MqttMessageDeliveredEvent mqttMessageDeliveredEvent) {log.info("已传输完成的事件通知");}/*** 消息订阅的事件通知* @param mqttSubscribedEvent*/@EventListener(classes = MqttSubscribedEvent.class)public void listenerAction(MqttSubscribedEvent mqttSubscribedEvent) {log.info("消息订阅的事件通知");}}

7、接口测试

@Resourceprivate MqttGateway mqttGateway;/*** sendData 消息* topic 订阅主题**/@RequestMapping(value = "/sendMqtt",method = RequestMethod.POST)public String sendMqtt(String sendData, String topic) {MqttMessage mqttMessage = new MqttMessage();mqttGateway.sendToMqtt(topic, sendData);//mqttGateway.sendToMqttObject(topic, sendData.getBytes());return "OK";}

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。