spring boot+netty 搭建MQTT broken
一、项目结构
二、安装依赖
<!-- netty包 --><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.75.Final</version></dependency><!-- 常用JSON工具包 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.80</version></dependency><!--mqtt服务端--><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency>
三、配置文件
server:port: 8880
netty:mqtt:port: 1883
# mqtt账号username: admin
#mqtt密码password: 123456
# 日记配置
logging:level:# 开启debug日记打印com.hyx: debug
四、新建具体类
1、新建BootNettyMqttMsgBack类
package com.example.springnettymqtt.MQTTServer.callback;import com.example.springnettymqtt.MQTTServer.config.MQTTServerProperties;
import io.netty.channel.Channel;
import lombok.RequiredArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;import io.netty.channel.ChannelId;
import io.netty.handler.codec.mqtt.*;import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;import static com.example.springnettymqtt.MQTTServer.channel.MQTTDeviceManerger.MQTTdeviceAdd;
import static com.example.springnettymqtt.MQTTServer.channel.MQTTDeviceManerger.MQTTdeviceRemove;
import static com.example.springnettymqtt.MQTTServer.server.impl.MQTTServer.*;@Component
@RequiredArgsConstructor
public class BootNettyMqttMsgBack {private static final Logger log = LoggerFactory.getLogger(BootNettyMqttMsgBack.class);private final MQTTServerProperties MQTTserverProperties;/*** 确认连接请求* @param channel* @param mqttMessage*/public void connack (Channel channel, MqttMessage mqttMessage) {MqttConnectMessage mqttConnectMessage = (MqttConnectMessage) mqttMessage;MqttFixedHeader mqttFixedHeaderInfo = mqttConnectMessage.fixedHeader();MqttConnectVariableHeader mqttConnectVariableHeaderInfo = mqttConnectMessage.variableHeader();// 构建返回报文, 可变报头MqttConnAckVariableHeader mqttConnAckVariableHeaderBack = new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, mqttConnectVariableHeaderInfo.isCleanSession());// 构建返回报文, 固定报头MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.CONNACK,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);// 构建CONNACK消息体MqttConnAckMessage connAck = new MqttConnAckMessage(mqttFixedHeaderBack, mqttConnAckVariableHeaderBack);//log.info("back--"+connAck.toString());log.debug("设备上线,channelId:{}", channel.id());MQTTdeviceAdd(channel);channel.writeAndFlush(connAck);}public void disconnack (Channel channel, MqttMessage mqttMessage) {MqttConnectMessage mqttConnectMessage = (MqttConnectMessage) mqttMessage;MqttFixedHeader mqttFixedHeaderInfo = mqttConnectMessage.fixedHeader();MqttConnectVariableHeader mqttConnectVariableHeaderInfo = mqttConnectMessage.variableHeader();// 构建返回报文, 可变报头MqttConnAckVariableHeader mqttConnAckVariableHeaderBack = new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BANNED, mqttConnectVariableHeaderInfo.isCleanSession());// 构建返回报文, 固定报头MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.DISCONNECT,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);// 构建CONNACK消息体MqttConnAckMessage connAck = new MqttConnAckMessage(mqttFixedHeaderBack, mqttConnAckVariableHeaderBack);//log.info("back--"+connAck.toString());channel.writeAndFlush(connAck);log.debug("设备下线,channelId:{}", channel.id());MQTTdeviceRemove(channel);}/*** 根据qos发布确认* @param channel* @param mqttMessage*/public void puback (Channel channel, MqttMessage mqttMessage) {MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;MqttFixedHeader mqttFixedHeaderInfo = mqttPublishMessage.fixedHeader();MqttQoS qos = mqttFixedHeaderInfo.qosLevel();//注意: readableBytes会改变写指针位置,使后续推送数据时,读取数据为空,需要重置 读指针byte[] headBytes = new byte[mqttPublishMessage.payload().readableBytes()];mqttPublishMessage.payload().readBytes(headBytes);String data = new String(headBytes);System.out.println("publish data-->"+data);//重置读取的指针mqttPublishMessage.payload().resetReaderIndex();switch (qos) {case AT_MOST_ONCE: // 至多一次//推送到订阅的客户端subscribSend(mqttMessage);break;case AT_LEAST_ONCE: // 至少一次// 构建返回报文, 可变报头MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId());// 构建返回报文, 固定报头MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBACK,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);// 构建PUBACK消息体MqttPubAckMessage pubAck = new MqttPubAckMessage(mqttFixedHeaderBack, mqttMessageIdVariableHeaderBack);log.info("back--"+pubAck.toString());channel.writeAndFlush(pubAck);//推送到订阅的客户端subscribSend(mqttMessage);break;case EXACTLY_ONCE: // 刚好一次// 构建返回报文, 固定报头MqttFixedHeader mqttFixedHeaderBack2 = new MqttFixedHeader(MqttMessageType.PUBREC,false, MqttQoS.AT_LEAST_ONCE,false,0x02);// 构建返回报文, 可变报头MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack2 = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId());MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack2,mqttMessageIdVariableHeaderBack2);//服务端收到publis的QoS2的消息之后,服务端需要保存一个msgid的记录,并且进入一个状态,// 即之后不管来了几个这个msgid的消息,都不管他,认为是重复的,丢弃。//接收到publish的QoS2消息之后,不能马上投递给上层,而是在本地做持久化,将消息保存起来。int mqttMessageId=mqttPublishMessage.variableHeader().packetId();if(!mqttMessageIdMap.containsKey(mqttMessageId)){//不存在此消息,将此消息暂存mqttMessageIdMap.put(mqttMessageId, mqttMessage);log.info("消息ID"+mqttMessageId+"-->Qos2级别消息,消息缓存");}else{//重复发送消息,直接返回log.info(mqttPublishMessage.variableHeader().packetId()+"消息重复:"+mqttPublishMessage.fixedHeader().isDup());return;}channel.writeAndFlush(mqttMessageBack);break;default:break;}}/*** 发布完成 qos2* @param channel* @param mqttMessage*/public void pubcomp (Channel channel, MqttMessage mqttMessage) {MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();// 构建返回报文, 固定报头MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBCOMP,false, MqttQoS.AT_MOST_ONCE,false,0x02);// 构建返回报文, 可变报头MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack,mqttMessageIdVariableHeaderBack);//log.info("back--"+mqttMessageBack.toString());channel.writeAndFlush(mqttMessageBack);}/*** 订阅确认* @param channel* @param mqttMessage*/public void suback(Channel channel, MqttMessage mqttMessage) {MqttSubscribeMessage mqttSubscribeMessage = (MqttSubscribeMessage) mqttMessage;MqttMessageIdVariableHeader messageIdVariableHeader = mqttSubscribeMessage.variableHeader();// 构建返回报文, 可变报头MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());Set<String> topics = mqttSubscribeMessage.payload().topicSubscriptions().stream().map(mqttTopicSubscription -> mqttTopicSubscription.topicName()).collect(Collectors.toSet());//log.info(topics.toString());List<Integer> grantedQoSLevels = new ArrayList<>(topics.size());for (int i = 0; i < topics.size(); i++) {grantedQoSLevels.add(mqttSubscribeMessage.payload().topicSubscriptions().get(i).qualityOfService().value());}// 构建返回报文 有效负载MqttSubAckPayload payloadBack = new MqttSubAckPayload(grantedQoSLevels);// 构建返回报文 固定报头MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2+topics.size());// 构建返回报文 订阅确认MqttSubAckMessage subAck = new MqttSubAckMessage(mqttFixedHeaderBack,variableHeaderBack, payloadBack);channel.writeAndFlush(subAck);}/*** 取消订阅确认* @param channel* @param mqttMessage*/public void unsuback(Channel channel, MqttMessage mqttMessage) {MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();// 构建返回报文 可变报头MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());// 构建返回报文 固定报头MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2);// 构建返回报文 取消订阅确认MqttUnsubAckMessage unSubAck = new MqttUnsubAckMessage(mqttFixedHeaderBack,variableHeaderBack);channel.writeAndFlush(unSubAck);}/*** 心跳响应* @param channel* @param mqttMessage*/public void pingresp (Channel channel, MqttMessage mqttMessage) {// 心跳响应报文 11010000 00000000 固定报文MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0);MqttMessage mqttMessageBack = new MqttMessage(fixedHeader);channel.writeAndFlush(mqttMessageBack);}/*** 订阅推送*/public void subscribSend(MqttMessage mqttMessage){MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;Object obj=mqttMessage.variableHeader();MqttPublishVariableHeader variableHeader=(MqttPublishVariableHeader)obj;String topicName=variableHeader.topicName();int packetId=variableHeader.packetId();//固定消息头 注意此处的消息类型PUBLISH mqtt协议MqttFixedHeader FixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH,false, MqttQoS.AT_LEAST_ONCE,false,0);//可变消息头MqttPublishVariableHeader mqttPublishVariableHeader=new MqttPublishVariableHeader(topicName,packetId);//推送消息体MqttPublishMessage mqttPublishMessageResult=new MqttPublishMessage(FixedHeader,mqttPublishVariableHeader, mqttPublishMessage.content());log.info("推送地址————》"+topicName);if(subscribeMap.containsKey(topicName)){List<ChannelId> channelList=subscribeMap.get(topicName);for (int i = 0; i < channelList.size(); i++) {//订阅次此topic的Mqtt客户端搜到此消息,Channel channelSub=MQTTdeviceChannelGroup.find(channelList.get(i));//writeAndFlush会将ByteBuf的引用释放refCnt会减去1,使用retain加1mqttPublishMessageResult.retain();channelSub.writeAndFlush(mqttPublishMessageResult);}mqttPublishMessageResult.release();}}/*** 用户鉴权*/public boolean authentication(MqttConnectPayload payload){String username=MQTTserverProperties.getUsername();String password=MQTTserverProperties.getPassword();//无账号或者无密码通过if(stringEmptyCheck(password)||stringEmptyCheck(username)){return true;}else {//消息中账号密码为空if(payload.passwordInBytes()==null||payload.userName()==null){return false;}String passwordAuthen=new String(payload.passwordInBytes());String usernameAuthen=payload.userName();if(password.equals(passwordAuthen)&&username.equals(usernameAuthen)){return true;}else {return false;}}}//判断字符字符为空private boolean stringEmptyCheck(String str){if(str==null||"".equals(str)){return true;}else {return false;}}
}
2、新建MqttChannelInit类
package com.example.springnettymqtt.MQTTServer.channel;import com.example.springnettymqtt.MQTTServer.handler.MQTTMessageHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.concurrent.TimeUnit;@Component
@RequiredArgsConstructor
public class MqttChannelInit extends ChannelInitializer<SocketChannel> {@Autowiredprivate MQTTMessageHandler MQTTmessageHandler;@Overrideprotected void initChannel(SocketChannel channel) {channel.pipeline()// 心跳时间.addLast("idle", new IdleStateHandler(600, 600, 1200, TimeUnit.SECONDS)).addLast("encoder", MqttEncoder.INSTANCE).addLast("decoder", new MqttDecoder()).addLast(MQTTmessageHandler);}
}
3、新建MQTTDeviceManerger类
package com.example.springnettymqtt.MQTTServer.channel;import lombok.extern.slf4j.Slf4j;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import lombok.extern.slf4j.Slf4j;import java.util.List;
import java.util.Map;import static com.example.springnettymqtt.MQTTServer.server.impl.MQTTServer.*;@Slf4j
public class MQTTDeviceManerger {/*** 设备接入*/public static void MQTTdeviceAdd(Channel channel) {if(!MQTTdeviceChannelGroup.contains(channel)) {MQTTdeviceChannelGroup.add(channel);}}/*** 设备移除和和订阅的topic*/public static void MQTTdeviceRemove(Channel channel) {if(MQTTdeviceChannelGroup.contains(channel)) {MQTTdeviceChannelGroup.remove(channel);MQTTremoveDeviceChannelId(channel.id());//移除topic中的这个设备的chanelidfor (Map.Entry<String, List<ChannelId>> listEntry : subscribeMap.entrySet()) {try {if (listEntry.getValue().contains(channel.id())) {listEntry.getValue().remove(channel.id());log.info(channel.id() + "下线,topic: " + listEntry.getKey() + "中移除此id");}} catch (Exception e) {e.printStackTrace();}}}}public static void MQTTremoveDeviceChannelId(ChannelId channelId) {MQTTdeviceMap.entrySet().removeIf(item -> item.getValue().equals(channelId));}
}
4、新建配置类
package com.example.springnettymqtt.MQTTServer.config;import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;@Configuration
@ConfigurationProperties(prefix = MQTTServerProperties.MQTTPREFIX)
@Data
public class MQTTServerProperties {public static final String MQTTPREFIX = "netty.mqtt";/*** 服务器端口*/private Integer port;/*** mqtt服务器用户名*/private String username;/*** mqtt服务器密码*/private String password;
}
5、新建MQTTMessageHandler类
package com.example.springnettymqtt.MQTTServer.handler;import com.example.springnettymqtt.MQTTServer.callback.BootNettyMqttMsgBack;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import io.netty.channel.*;
import io.netty.handler.codec.mqtt.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;import static com.example.springnettymqtt.MQTTServer.channel.MQTTDeviceManerger.MQTTdeviceRemove;
import static com.example.springnettymqtt.MQTTServer.server.impl.MQTTServer.*;@Slf4j
@Component
@ChannelHandler.Sharable
@RequiredArgsConstructor
public class MQTTMessageHandler extends ChannelInboundHandlerAdapter {@Autowiredprivate BootNettyMqttMsgBack bootNettyMqttMsgBack;@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (null != msg) {MqttMessage mqttMessage = (MqttMessage) msg;log.info("info--"+mqttMessage.toString());MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();Channel channel = ctx.channel();if(mqttFixedHeader.messageType().equals(MqttMessageType.CONNECT)){//用户鉴权(配置文件中配置账号和密码,如果没有默认)log.warn("正在尝试鉴权");boolean authentag= bootNettyMqttMsgBack.authentication((MqttConnectPayload)mqttMessage.payload());if(!authentag){return;}// 在一个网络连接上,客户端只能发送一次CONNECT报文。服务端必须将客户端发送的第二个CONNECT报文当作协议违规处理并断开客户端的连接if(MQTTdeviceChannelGroup.contains(channel)){//移除次设备channel和topicbootNettyMqttMsgBack.disconnack(channel,mqttMessage);}// to do 建议connect消息单独处理,用来对客户端进行认证管理等 这里直接返回一个CONNACK消息bootNettyMqttMsgBack.connack(channel, mqttMessage);}//对于没有鉴权的设备,请求不处理if(!MQTTdeviceChannelGroup.contains(channel)){log.warn(channel.id()+"无鉴权操作");return;}switch (mqttFixedHeader.messageType()){case PUBLISH: // 客户端发布消息// PUBACK报文是对QoS 1等级的PUBLISH报文的响应bootNettyMqttMsgBack.puback(channel, mqttMessage);break;// PUBREL Qos2级别消息,客户端返回case PUBREL:// PUBREL(客户端发给服务端)报文是对PUBREC(服务端发给客户端)报文的响应//服务端收到pubrel之后,正式将消息投递给上层应用层。MqttMessageIdVariableHeader VariableHeader=(MqttMessageIdVariableHeader)mqttMessage.variableHeader();if(mqttMessageIdMap.containsKey(VariableHeader.messageId())) {log.warn("移除消息缓存-->消息id"+VariableHeader.messageId());bootNettyMqttMsgBack.subscribSend(mqttMessageIdMap.get(VariableHeader.messageId()));bootNettyMqttMsgBack.pubcomp(channel, mqttMessage);mqttMessageIdMap.remove(VariableHeader.messageId());}else {//后续多次收到REL消息,制作comp响应bootNettyMqttMsgBack.pubcomp(channel, mqttMessage);}break;case SUBSCRIBE: // 客户端订阅主题// 客户端向服务端发送SUBSCRIBE报文用于创建一个或多个订阅,每个订阅注册客户端关心的一个或多个主题。// 为了将应用消息转发给与那些订阅匹配的主题,服务端发送PUBLISH报文给客户端。// SUBSCRIBE报文也(为每个订阅)指定了最大的QoS等级,服务端根据这个发送应用消息给客户端// to dobootNettyMqttMsgBack.suback(channel, mqttMessage);MqttSubscribePayload SubscribePayload=(MqttSubscribePayload) mqttMessage.payload();;for (int i = 0; i < SubscribePayload.topicSubscriptions().size(); i++) {String topicname=SubscribePayload.topicSubscriptions().get(i).topicName();boolean tag=subscribeMap.containsKey(topicname);if(tag){List<ChannelId> channelIds=subscribeMap.get(topicname);if(!channelIds.contains(channel.id())) {channelIds.add(channel.id());}else {log.warn(channel.id()+"重复订阅");}subscribeMap.put(topicname, channelIds);}else {List<ChannelId> channelIds=new ArrayList<>();channelIds.add(channel.id());subscribeMap.put(topicname,channelIds);}log.info(channel.id()+"订阅地址————》"+topicname);}break;case UNSUBSCRIBE: // 客户端取消订阅// 客户端发送UNSUBSCRIBE报文给服务端,用于取消订阅主题// to dobootNettyMqttMsgBack.unsuback(channel, mqttMessage);Object Unsubscribe=mqttMessage.payload();MqttUnsubscribePayload unsubscribePayload=(MqttUnsubscribePayload)Unsubscribe;int len=unsubscribePayload.topics().size();for (int i = 0; i < len; i++) {String topicname=unsubscribePayload.topics().get(i);boolean tag=subscribeMap.containsKey(topicname);if(tag){List<ChannelId> channelIds=subscribeMap.get(topicname);channelIds.remove(channel.id());subscribeMap.put(topicname,channelIds);}else {log.error("不存在订阅地址——>"+topicname);}log.info(channel.id()+"取消订阅地址————》"+topicname);}break;case PINGREQ: // 客户端发起心跳// 客户端发送PINGREQ报文给服务端的// 在没有任何其它控制报文从客户端发给服务的时,告知服务端客户端还活着// 请求服务端发送 响应确认它还活着,使用网络以确认网络连接没有断开bootNettyMqttMsgBack.pingresp(channel, mqttMessage);break;case DISCONNECT: // 客户端主动断开连接log.debug("设备下线,channelId:{}", channel.id());MQTTdeviceRemove(channel);// DISCONNECT报文是客户端发给服务端的最后一个控制报文, 服务端必须验证所有的保留位都被设置为0// to dobreak;default:break;}}else {return;}}/*** 从客户端收到新的数据、读取完成时调用*/@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws IOException {}/*** 客户端与服务端第一次建立连接时执行 在channelActive方法之前执行*/@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {super.channelRegistered(ctx);}/*** 客户端与服务端 断连时执行 channelInactive方法之后执行*/@Overridepublic void channelUnregistered(ChannelHandlerContext ctx) throws Exception {log.warn(ctx.channel().id()+"连接断开");MQTTdeviceRemove(ctx.channel());super.channelUnregistered(ctx);}/*** 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {Channel channel = ctx.channel();log.warn(channel.id()+"连接异常断开。。。。。。。");MQTTdeviceRemove(ctx.channel());super.exceptionCaught(ctx, cause);if(channel.isActive()){ctx.close();}}@Overridepublic void channelInactive(ChannelHandlerContext ctx) {log.debug("\n");}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {super.channelActive(ctx);}/*** 服务端 当读超时时 会调用这个方法*/@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {super.userEventTriggered(ctx, evt);ctx.close();}@Overridepublic void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {super.channelWritabilityChanged(ctx);}
}
6、新建接口IMQTTServer及其实现类MQTTServer
package com.example.springnettymqtt.MQTTServer.server;import javax.annotation.PreDestroy;public interface IMQTTServer {/*** 主启动程序,初始化参数** @throws Exception 初始化异常*/void start() throws Exception;/*** 优雅的结束服务器** @throws InterruptedException 提前中断异常*/@PreDestroyvoid destroy() throws InterruptedException;
}
package com.example.springnettymqtt.MQTTServer.server.impl;import com.example.springnettymqtt.MQTTServer.channel.MqttChannelInit;
import com.example.springnettymqtt.MQTTServer.config.MQTTServerProperties;
import com.example.springnettymqtt.MQTTServer.server.IMQTTServer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelId;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import javax.annotation.PreDestroy;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;@Component
@Slf4j
@RequiredArgsConstructor
public class MQTTServer implements IMQTTServer {private final MqttChannelInit mqttChannelInit;private final MQTTServerProperties MQTTserverProperties;//保存接入的MQTT设备channelpublic static ChannelGroup MQTTdeviceChannelGroup;//保存订阅地址和chanelid,当推送数据时,会向此订阅地址的多个channel发送数据public static Map<String, List<ChannelId>> subscribeMap =new ConcurrentHashMap();//保存设备名称和通道编号,向设备发送消息可以通过名称找到通道public static ConcurrentHashMap<String, ChannelId> MQTTdeviceMap = new ConcurrentHashMap<>();//存放Qos2消息等级的消息ID,这里可使用redis之类的工具做缓存,为了简化配置,使用map暂存public static ConcurrentHashMap<Integer, MqttMessage> mqttMessageIdMap=new ConcurrentHashMap();private EventLoopGroup bossGroup;private EventLoopGroup workerGroup;@Overridepublic void start() {log.info("初始化 Mqttserver ...");bossGroup = new NioEventLoopGroup();workerGroup = new NioEventLoopGroup();MQTTdeviceChannelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);this.MqttServer();}/*** 初始化*/private void MqttServer() {try {new ServerBootstrap().group(bossGroup, workerGroup).channel( NioServerSocketChannel.class ).localAddress(new InetSocketAddress(MQTTserverProperties.getPort()))// 配置 编码器、解码器、业务处理.childHandler(mqttChannelInit)// tcp缓冲区.option(ChannelOption.SO_BACKLOG, 128)// 将网络数据积累到一定的数量后,服务器端才发送出去,会造成一定的延迟。希望服务是低延迟的,建议将TCP_NODELAY设置为true.childOption(ChannelOption.TCP_NODELAY, true)// 保持长连接.childOption(ChannelOption.SO_KEEPALIVE, true)// 绑定端口,开始接收进来的连接.bind().sync();log.info("MQTT服务启动成功!开始监听端口:{}", MQTTserverProperties.getPort());} catch (Exception e) {e.printStackTrace();bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}/*** 销毁*/@PreDestroy@Overridepublic void destroy() {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}
}
7、新建启动类
package com.example.springnettymqtt.startServer;import com.example.springnettymqtt.MQTTServer.server.impl.MQTTServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;@Component
public class StartSrver {@Autowiredprivate MQTTServer mqttServer;@PostConstructpublic void startNetty(){new Thread(() -> {try {mqttServer.start();} catch (Exception e) {e.printStackTrace();}}).start();}
}