Unity实现MQTT服务器
首先下载MqttNet:MqttNet下载地址
解压好后使用vs打开,并生成.dll文件(我这里下载的是4.1.2.350版本)
然后再/Source/MQTTnet/bin/Debug/net452 文件夹中找到生成的文件
新建unity工程,创建Plugins文件夹,将文件复制到该文件夹内
然后新建脚本MyMqttServer,代码内带有注释
using MQTTnet.Adapter;
using MQTTnet.Server;
using System.Collections;
using System.Collections.Generic;
using UnityEngine;
using MQTTnet;
using System.Threading.Tasks;
using System;
using System.Diagnostics.Tracing;
using System.Net;
using MQTTnet.Diagnostics;
using System.Text;
using MQTTnet.Protocol;
using static UnityEditor.ObjectChangeEventStream;namespace MQTT.Server
{/// <summary>/// 客户端信息/// </summary>public class MqttClientObject{public MqttClientObject(string clientID, string userName = "", string password = ""){this.ClientID = clientID;this.UserName = userName;this.PassWord = password;}public string ClientID { get; set; }public string UserName { get; set; }public string PassWord { get; set; }}/// <summary>/// MQTT服务/// </summary>public class MyMqttServer{/// <summary>/// 服务器对象/// </summary>private MqttServer m_MqttServer = null;/// <summary>/// Mqtt服务器选项生成器/// </summary>MqttServerOptionsBuilder optionbuilder = null;/// <summary>/// 连接的客户端/// </summary>private List<MqttClientObject> m_MqttClientObject = new List<MqttClientObject>();/// <summary>/// 开启服务器/// </summary>/// <param name="port"></param>public void StartMqttServer(int port = 1883){//1.创建服务器对象if (m_MqttServer == null){//创建Mqtt服务器选项生成器optionbuilder = new MqttFactory().CreateServerOptionsBuilder();//设置带有默认终端optionbuilder.WithDefaultEndpoint();//设置终端端口号optionbuilder.WithDefaultEndpointPort(port);//设置具有持续会话optionbuilder.WithPersistentSessions(true);//设置无默认通信超时optionbuilder.WithDefaultCommunicationTimeout(TimeSpan.FromMilliseconds(60000));m_MqttServer = new MqttFactory().CreateMqttServer(optionbuilder.Build());}//监测服务器 开启/关闭m_MqttServer.StartedAsync += ServerStarted;m_MqttServer.StoppedAsync += ServerStoped;//监测客户端 连接/断开连接m_MqttServer.ClientConnectedAsync += ClientConnected;m_MqttServer.ClientDisconnectedAsync += ClientDisconnected;//监测客户端 订阅/取消订阅m_MqttServer.ClientSubscribedTopicAsync += ClientSubscribedTopic;m_MqttServer.ClientUnsubscribedTopicAsync += ClientUnSubscribedTopic;//客户端连接信息验证m_MqttServer.ValidatingConnectionAsync += ValidatingConnection;//获取客户端发送的消息m_MqttServer.InterceptingPublishAsync += InterceptingPublish;//开启服务器m_MqttServer.StartAsync();}/// <summary>/// 关闭服务/// </summary>public void StopMqttServer(){if (m_MqttServer != null){m_MqttServer.StopAsync();}}/// <summary>/// 获取服务器状态--开启/关闭/// </summary>/// <returns></returns>public bool GetMqttServerState(){if (m_MqttServer == null){return false;}return m_MqttServer.IsStarted;}/// <summary>/// 服务器完成开启/// </summary>/// <param name="eventArgs"></param>/// <returns></returns>/// <exception cref="NotImplementedException"></exception>private Task ServerStarted(EventArgs eventArgs){Debug.Log("服务: started!");return Task.CompletedTask;}/// <summary>/// 服务器完成关闭/// </summary>/// <param name="eventArgs"></param>/// <returns></returns>/// <exception cref="NotImplementedException"></exception>private Task ServerStoped(EventArgs eventArgs){Debug.Log("服务: stoped!");return Task.CompletedTask;}/// <summary>/// 客户端连接完成/// </summary>/// <param name="eventArgs"></param>/// <returns></returns>/// <exception cref="NotImplementedException"></exception>private Task ClientConnected(ClientConnectedEventArgs eventArgs){Debug.Log($"服务:client Connected ClientId:{eventArgs.ClientId}");m_MqttClientObject.Add(new MqttClientObject(eventArgs.ClientId, eventArgs.UserName));return Task.CompletedTask;}/// <summary>/// 客户端订阅主题完成/// </summary>/// <param name="eventArgs"></param>/// <returns></returns>/// <exception cref="NotImplementedException"></exception>private Task ClientSubscribedTopic(ClientSubscribedTopicEventArgs eventArgs){Debug.Log($"Client:{eventArgs.ClientId} -- Subscribed -- Topic:{eventArgs.TopicFilter.Topic}");ServerPublich(eventArgs.TopicFilter.Topic, $"Client:{eventArgs.ClientId} Subscribed this Topic");return Task.CompletedTask;}/// <summary>/// 客户端取消订阅主题完成/// </summary>/// <param name="eventArgs"></param>/// <returns></returns>/// <exception cref="NotImplementedException"></exception>private Task ClientUnSubscribedTopic(ClientUnsubscribedTopicEventArgs eventArgs){Debug.Log($"Client:{eventArgs.ClientId} -- Subscribed -- Topic:{eventArgs.TopicFilter}");return Task.CompletedTask;}/// <summary>/// 客户端断开连接完成/// </summary>/// <param name="eventArgs"></param>/// <returns></returns>/// <exception cref="NotImplementedException"></exception>private Task ClientDisconnected(ClientDisconnectedEventArgs eventArgs){Debug.Log($"Client:client DisConnected ClientId:{eventArgs.ClientId}");return Task.CompletedTask;}/// <summary>/// 客户端连接信息验证/// </summary>/// <param name="args"></param>/// <returns></returns>private Task ValidatingConnection(ValidatingConnectionEventArgs args){Debug.Log($"UserName:{args.UserName},PassWord:{args.Password}");return Task.CompletedTask;}/// <summary>/// 获取客户端发送的消息/// </summary>/// <param name="args"></param>/// <returns></returns>private Task InterceptingPublish(InterceptingPublishEventArgs args){Debug.Log($"Client:{args.ClientId} send Message : {Encoding.UTF8.GetString(args.ApplicationMessage.Payload)} -- Topic:{args.ApplicationMessage.Topic}");return Task.CompletedTask;}/// <summary>/// 服务器广播消息/// </summary>/// <param name="topic">主题</param>/// <param name="message">信息</param>public void ServerPublich(string topic, string message, bool isRetain = false, MqttQualityOfServiceLevel level = MqttQualityOfServiceLevel.ExactlyOnce){var builder = new MqttApplicationMessageBuilder().WithTopic(topic).WithPayload(message).WithRetainFlag(isRetain).WithQualityOfServiceLevel(level).Build();var data = new InjectedMqttApplicationMessage(builder);data.SenderClientId = "1";m_MqttServer.InjectApplicationMessage(data);}}
}
创建脚本MyMqttClient,代码内带有注释
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Diagnostics;
using MQTTnet.Protocol;
using MQTTnet.Server;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using UnityEngine;/// <summary>
/// MQTT客户端
/// </summary>
public class MyMqttClient
{private IMqttClient m_MqttClient = null;private string m_ClientID;/// <summary>/// 创建客户端并连接服务器/// </summary>/// <param name="clientID"></param>/// <param name="ip"></param>/// <param name="port"></param>public MyMqttClient(string clientID, string ip = "127.0.0.1", int port = 1883){m_ClientID = clientID;//客户端选项生成器var options = new MqttClientOptionsBuilder().WithClientId(m_ClientID).WithTcpServer(ip, port).Build();//创建客户端m_MqttClient = new MqttFactory().CreateMqttClient();//监测客户端 连接/断开连接 完成m_MqttClient.ConnectedAsync += ClientConnected;m_MqttClient.DisconnectedAsync += ClientDisConnected;//客户端接收到消息m_MqttClient.ApplicationMessageReceivedAsync += ReceiveMsg;//连接服务器m_MqttClient.ConnectAsync(options);}/// <summary>/// 接收到消息/// </summary>/// <param name="args"></param>/// <returns></returns>private Task ReceiveMsg(MqttApplicationMessageReceivedEventArgs args){Debug.Log($"Receive Message From Client:{args.ClientId} msg:{Encoding.UTF8.GetString(args.ApplicationMessage.Payload)}");return Task.CompletedTask;}/// <summary>/// 断开连接完成/// </summary>/// <param name="args"></param>/// <returns></returns>private Task ClientDisConnected(MqttClientDisconnectedEventArgs args){Debug.Log("disConnected");return Task.CompletedTask;}/// <summary>/// 连接完成/// </summary>/// <param name="args"></param>/// <returns></returns>private Task ClientConnected(MqttClientConnectedEventArgs args){Debug.Log("connected");return Task.CompletedTask;}/// <summary>/// 发布消息/// </summary>public void PublishMsg(string topic, string message, MqttQualityOfServiceLevel level = MqttQualityOfServiceLevel.ExactlyOnce, bool isRetain = false){m_MqttClient.PublishStringAsync(topic, message, level, isRetain);}/// <summary>/// 订阅主题/// </summary>public void Subscribe(string topic){m_MqttClient.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic(topic).Build());}
}
到这服务器与客户端脚本完成,接下来只需要写一些测试脚本,调用上面脚本的接口,即可完成服务器的开启/关闭,客户端连接,收发消息等功能。