700字范文,内容丰富有趣,生活中的好帮手!
700字范文 > C# 基于MQTTNet的服务端与客户端通信案例

C# 基于MQTTNet的服务端与客户端通信案例

时间:2020-01-14 11:43:41

相关推荐

C# 基于MQTTNet的服务端与客户端通信案例

MQTT(消息队列遥测传输)是ISO 标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息协议。它工作在 TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议。

MQTTnet 是一个用于基于 MQTT 的通信的高性能 .NET 库。它提供 MQTT 客户机和 MQTT 服务器(代理),并支持版本 5 之前的 MQTT 协议。

开发环境:VSMQTT客户端:MQTTnet3.0.11MQT服务端:MQTTnet 3.0.16

服务端及客户端Demo地址:C#基于MQTTNet的服务端与客户端通信案例-C#文档类资源-CSDN下载

客户端

Winfrom 程序

using MQTTnet;using MQTTnet.Client;using MQTTnet.Client.Connecting;using MQTTnet.Client.Disconnecting;using MQTTnet.Client.Options;using MQTTnet.Client.Receiving;using MQTTnet.Extensions.ManagedClient;using MQTTnet.Formatter;using Newtonsoft.Json;using System;using System.Collections.Generic;using ponentModel;using System.Data;using System.Drawing;using System.IO;using System.Linq;using System.Text;using System.Threading;using System.Threading.Tasks;using System.Windows.Forms;namespace MQTTDemo{public partial class Form1 : Form{public Form1(){InitializeComponent();}private MqttClientOptions options;private IManagedMqttClient mqttClient;private void OnSubscriberConnected(MqttClientConnectedEventArgs x){AppendLogMsg("已连接到MQTT服务器!");}private void OnSubscriberDisconnected(MqttClientDisconnectedEventArgs x){AppendLogMsg("已断开MQTT服务器连接!");}private void OnSubscriberMessageReceived(MqttApplicationMessageReceivedEventArgs x){var payloadString = x.ApplicationMessage.ConvertPayloadToString();payloadString = ConvertJsonString(payloadString);var item = $"{Environment.NewLine}Topic: {x.ApplicationMessage.Topic}{Environment.NewLine}Payload: {payloadString} {Environment.NewLine}QoS: {x.ApplicationMessage.QualityOfServiceLevel}";this.BeginInvoke((MethodInvoker)delegate{AppendReceiveMsg(item);});}private async Task SubscriberStart(){var tcpServer = tbx_mqtt_server.Text;var tcpPort = int.Parse(tbx_mqtt_port.Text.Trim());var mqttUser = tbx_user_name.Text.Trim();var mqttPassword = tbx_pwd.Text.Trim();var mqttFactory = new MqttFactory();this.options = new MqttClientOptions{ClientId = "ClientSubscriber",ProtocolVersion = MqttProtocolVersion.V311,ChannelOptions = new MqttClientTcpOptions{Server = tcpServer,Port = tcpPort}};if (options.ChannelOptions == null){throw new InvalidOperationException();}if(!string.IsNullOrEmpty(mqttUser)){options.Credentials = new MqttClientCredentials{Username = mqttUser,Password = Encoding.UTF8.GetBytes(mqttPassword)};}options.CleanSession = true;options.KeepAlivePeriod = TimeSpan.FromSeconds(5);this.mqttClient = mqttFactory.CreateManagedMqttClient();this.mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate((Action<MqttClientConnectedEventArgs>)OnSubscriberConnected);this.mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate((Action<MqttClientDisconnectedEventArgs>)OnSubscriberDisconnected);this.mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate((Action<MqttApplicationMessageReceivedEventArgs>)OnSubscriberMessageReceived);await this.mqttClient.StartAsync(new ManagedMqttClientOptions{ClientOptions = options});}private async void btn_connect_Click(object sender, EventArgs e){if (this.mqttClient == null){await SubscriberStart();btn_subscribe_Click(null, null);}btn_subscribe.Enabled = true;btn_cancel_subscribe.Enabled = true;btn_send_msg.Enabled = true;btn_connect.Enabled = false;btn_disconnect.Enabled = true;}private async void btn_subscribe_Click(object sender, EventArgs e){var topicFilter = new MqttTopicFilter { Topic = this.tbx_subscribe_topic.Text.Trim() };await this.mqttClient.SubscribeAsync(topicFilter);AppendLogMsg($"订阅到Topic:{ this.tbx_subscribe_topic.Text.Trim()}!");}private async void btn_send_msg_Click(object sender, EventArgs e){var publish_topic = tbx_publish_topic.Text.Trim();var publish_msg = tbx_send_msg.Text;var message = new MqttApplicationMessageBuilder().WithTopic(publish_topic).WithPayload(publish_msg).WithExactlyOnceQoS().Build();if (this.mqttClient != null){await this.mqttClient.PublishAsync(message);}}private async void btn_disconnect_Click(object sender, EventArgs e){if (this.mqttClient == null){return;}await this.mqttClient.StopAsync();this.mqttClient = null;btn_connect.Enabled = true;btn_disconnect.Enabled = false;btn_subscribe.Enabled = false;btn_cancel_subscribe.Enabled = false;btn_send_msg.Enabled = false;}private void AppendReceiveMsg(string msg){Invoke((new Action(() =>{tbx_receive_msg.AppendText(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss: ") + msg + Environment.NewLine + Environment.NewLine);})));}private void AppendSendMsg(string msg){Invoke((new Action(() =>{tbx_send_msg.AppendText(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss: ")+msg + Environment.NewLine);})));}private void AppendLogMsg(string msg){Invoke((new Action(() =>{tbx_log.AppendText(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss: ") + msg + Environment.NewLine);})));}private string ConvertJsonString(string str){try{//格式化json字符串JsonSerializer serializer = new JsonSerializer();TextReader tr = new StringReader(str);JsonTextReader jtr = new JsonTextReader(tr);object obj = serializer.Deserialize(jtr);if (obj != null){StringWriter textWriter = new StringWriter();JsonTextWriter jsonWriter = new JsonTextWriter(textWriter){Formatting = Formatting.Indented,Indentation = 4,IndentChar = ' '};serializer.Serialize(jsonWriter, obj);return textWriter.ToString();}return str;}catch (Exception ex){return str;}}private async void btn_cancel_subscribe_Click(object sender, EventArgs e){string[] topics = { this.tbx_subscribe_topic.Text.Trim() };await this.mqttClient.UnsubscribeAsync(topics);AppendLogMsg($"已取消订阅Topic:{ this.tbx_subscribe_topic.Text.Trim()}!");}private void btn_clear_receive_Click(object sender, EventArgs e){tbx_receive_msg.Clear();}}}

服务端

控制台程序

using Common;using MQTTnet;using MQTTnet.Client.Receiving;using MQTTnet.Protocol;using MQTTnet.Server;using System; using System.Text;using System.Threading.Tasks;namespace MQTT{class Program{int Port = ConfigHelper.GetConfigInt("Port");IMqttServer server = new MqttFactory().CreateMqttServer();static void Main(string[] args){Program program = new Program();program.StartMQTTAsync();Console.Read();}public async Task StartMQTTAsync(){MqttServerOptionsBuilder serverOptions = new MqttServerOptionsBuilder();serverOptions.WithConnectionValidator(client =>{string Account = client.Username;string PassWord = client.Password;string clientid = client.ClientId;if (Account == "test" && PassWord == "1234"){client.ReasonCode = MqttConnectReasonCode.Success;Console.WriteLine("校验成功");}else{client.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;Console.WriteLine("校验失败");}});serverOptions.WithDefaultEndpointPort(Port);//服务启动server.StartedHandler = new MqttServerStartedHandlerDelegate((Action<EventArgs>)StartedHandler);//服务停止server.StoppedHandler = new MqttServerStoppedHandlerDelegate((Action<EventArgs>)StoppedHandler);//客户端连接事件server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate( (Action<MqttServerClientConnectedEventArgs>)ClientConnectedHandler);//客户端断开连接事件server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate((Action<MqttServerClientDisconnectedEventArgs>) ClientDisconnectedHandler);//消息监听server.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate((Action<MqttApplicationMessageReceivedEventArgs>) MessageReceivedHandler);//客户端订阅主题事件server.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate((Action<MqttServerClientSubscribedTopicEventArgs>)ClientSubscribedTopicHandler);//客户端取消订阅主题事件server.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate((Action<MqttServerClientUnsubscribedTopicEventArgs>)ClientUnsubscribedTopicHandler);await server.StartAsync(serverOptions.Build());}/// <summary>/// MQTT启动服务器事件/// </summary>/// <param name="obj"></param>public void StartedHandler(EventArgs obj){Console.WriteLine($"程序已经启动!监听端口为: "+ Port);}/// <summary>/// MQTT服务器停止事件/// </summary>/// <param name="obj"></param>private void StoppedHandler(EventArgs obj){Console.WriteLine("程序已经关闭");}/// <summary>/// 客户端连接到服务器事件/// </summary>/// <param name="obj"></param>private void ClientConnectedHandler(MqttServerClientConnectedEventArgs obj){Console.WriteLine($"{obj.ClientId}此客户端已经连接到服务器");}/// <summary>/// 客户端断开连接事件/// </summary>/// <param name="obj"></param>private void ClientDisconnectedHandler(MqttServerClientDisconnectedEventArgs obj){Console.WriteLine($"断开连接的客户端:{obj.ClientId}");Console.WriteLine($"断开连接类型:{obj.DisconnectType.ToString()}");}/// <summary>/// 收到各个客户端发送的消息/// </summary>/// <param name="obj"></param>private void MessageReceivedHandler(MqttApplicationMessageReceivedEventArgs obj){Console.WriteLine("===================================================");Console.WriteLine("收到消息:");Console.WriteLine($"客户端:{obj.ClientId}");Console.WriteLine($"主题:{obj.ApplicationMessage.Topic}");Console.WriteLine($"消息:{Encoding.UTF8.GetString(obj.ApplicationMessage.Payload)}");Console.WriteLine("+++++++++++++++++++++++++++++++++++++++++++++++++++");Console.WriteLine();}/// <summary>/// 客户端订阅的主题/// </summary>/// <param name="obj"></param>private void ClientSubscribedTopicHandler(MqttServerClientSubscribedTopicEventArgs obj){Console.WriteLine($"客户端:{obj.ClientId}");Console.WriteLine($"订阅主题:{obj.TopicFilter.Topic}");}/// <summary>/// 客户端取消订阅主题/// </summary>/// <param name="obj"></param>private void ClientUnsubscribedTopicHandler(MqttServerClientUnsubscribedTopicEventArgs obj){Console.WriteLine($"客户端:{obj.ClientId}");Console.WriteLine($"取消订阅主题:{obj.TopicFilter}");}/// <summary>/// 关闭服务/// </summary>/// <returns></returns>public async Task StopAsync(){if (server != null){if (server.IsStarted){await server.StopAsync();server.Dispose();}}}}}

服务端及客户端Demo地址:C#基于MQTTNet的服务端与客户端通信案例-C#文档类资源-CSDN下载

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。