700字范文,内容丰富有趣,生活中的好帮手!
700字范文 > MQTT客户端 Paho Java 使用

MQTT客户端 Paho Java 使用

时间:2018-11-28 19:01:25

相关推荐

MQTT客户端 Paho Java 使用

文章目录

01、maven 依赖02、代码1-publisher 发布者2-订阅者 subscriber

01、maven 依赖

<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.2</version></dependency>

02、代码

1-publisher 发布者

public static void publishMessage(byte[] messageContent, String topic) {try {//获取MQTT 连接的选项MqttConnectOptions connOpts = getMqttConnectOptions();//新建客户端 参数分别是MQTT服务的地址,客户端名称(最好随机生成),第三个持久化还没研究我暂时用不到这样写就好了 MqttClient client = new MqttClient("tcp://broker.emqx.io:1883", "clientName", new MemoryPersistence());//建立链接client.connect(connOpts);// 创建消息MqttMessage message = new MqttMessage(messageContent);// 设置消息的服务质量 QOS0 QOS1 QOS2 等具体查看 Mqtt主题那点事对应的文章message.setQos(Constant.MQTT_QOS1);// 发布消息client.publish(topic, message);// 断开连接client.disconnect();// 关闭客户端client.close();} catch (MqttException me) {logger.error(me.getMessage(), me);}}

私有方法 getMqttConnectOptions() 代码如下

private static MqttConnectOptions getMqttConnectOptions() {// 创建链接参数MqttConnectOptions connOpts = new MqttConnectOptions();// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接connOpts.setCleanSession(true);// 设置连接的用户名 username 和 password 是安装好mqtt 服务后再后台设置的connOpts.setUserName("username");connOpts.setPassword("password".toCharArray());// 设置超时时间 单位为秒connOpts.setConnectionTimeout(120);// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制connOpts.setKeepAliveInterval(20);return connOpts;}

2-订阅者 subscriber

public static void subscribe(String... topic) {try {// MQTT的连接设置MqttConnectOptions mqttConnectOptions = getMqttConnectOptions();// 设置为断开时自动链接 mqttConnectOptions.setAutomaticReconnect(true);MqttClient client = new MqttClient(Constant.BROKER_ADDRESS + ":" + Constant.PORT, "sub_" + MqttUtil.getRandomString(10), new MemoryPersistence());// 设置回调函数client.setCallback(new MqttCallbackExtended() {@Overridepublic void connectComplete(boolean b, String s) {//链接成功时触发 用于重现连接logger.info("connect success " + s);try {//client 服务中需要订阅的主题client.subscribe(MessageUtil.getTopicFilter());} catch (MqttException e) {e.printStackTrace();}}/*** 连接断开 触发的函数*/@Overridepublic void connectionLost(Throwable throwable) {logger.error("device connect lost", throwable);if (!client.isConnected()) {try {//出现意外断开 触发重新连接client.reconnect();} catch (Exception e) {logger.error("Mqtt client close error", e);}}}/*** 接收订阅主题的内容 */@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) {//处理消息内容 handleArriveMessage(topic, mqttMessage);}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {logger.debug("deliveryComplete");}});client.connect(mqttConnectOptions);//订阅消息for (String t : topic) {client.subscribe(t, Constant.MQTT_QOS1);}} catch (Exception e) {logger.error(e.getMessage(), e);}}

为什么我的服务分别建立订阅者和发布者,因为订阅者需要保持接收消息,发布者只负责推送消息 ,推送消息完成关闭后也不会建立链接

重点内容:

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。