Java连接Emqx实现订阅发布消息
一:前提
安装了Emqx开源版、MQTTX客户端
二:订阅发布实现步骤
1.引入依赖
<!--MQTT客户端-->
<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.2</version>
</dependency>
2.编辑配置文件
mqtt:broker:uri: tcp://127.0.0.1:31883client:id: mqtt-am-client-${random.uuid}# 订阅主题配置(支持多个)inTopics:- topic: test/topic1qos: 0- topic: test/topic2qos: 1- topic: test/topic3qos: 2# 发布主题配置(支持多个)outTopics:- topic: out/topic1qos: 0username: ampassword: LGyPtuAB4th5pkeepAliveInterval: 60
3.读取配置文件
package com.wtzn.web.config;import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;import java.util.List;@Configuration
@ConfigurationProperties(prefix = "mqtt")
@Data
public class MqttProperties {private Broker broker;private Client client;private List<TopicConfig> inTopics;private List<TopicConfig> outTopics;private String userName;private String password;private int KeepAliveInterval;@Datapublic static class Broker {private String uri;}@Datapublic static class Client {private String id;}@Datapublic static class TopicConfig {private String topic;private int qos;}}
4.创建Mqtt客户端
package com.wtzn.web.config;import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class MqttConfig {@Autowiredprivate MqttProperties mqttProperties;@Beanpublic MqttClient mqttClient() throws MqttException {MqttClient client = new MqttClient(mqttProperties.getBroker().getUri(), mqttProperties.getClient().getId(), new MemoryPersistence());MqttConnectOptions options = new MqttConnectOptions();// 此客户端的用户名和密码options.setUserName(mqttProperties.getUserName());options.setPassword(mqttProperties.getPassword().toCharArray());options.setCleanSession(true);// 设置遗嘱消息// options.setWill(mqttProperties.getOutTopic(), "我是mqtt-am-client,我已下线,这是我的遗嘱".getBytes(), 2, true);// 连接超时重试options.setConnectionTimeout(5000); //毫秒options.setKeepAliveInterval(mqttProperties.getKeepAliveInterval());options.setAutomaticReconnect(true);//网络中断重连client.connect(options);return client;}
}
5.controller层
package com.wtzn.web.controller;import cn.dev33.satoken.annotation.SaIgnore;
import com.wtzn.common.json.utils.JsonUtils;
import com.wtzn.web.domain.bo.Payload;
import com.wtzn.web.service.MqttService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;import java.util.LinkedList;@RestController
@Slf4j
@RequestMapping("/mqtt")
public class MqttController {@Autowiredprivate MqttService mqttService;@SaIgnore@PostMapping("/mqtt")public void publish() {try {// LinkedList<Payload> payloadLinkedList=new LinkedList<>();for(int i=1; i<=10000; i++){Payload payload=new Payload();payload.setTemperature(i);// payloadLinkedList.add(payload);mqttService.publish("test/topic1",0,JsonUtils.toJsonString(payload));}} catch (MqttException e) {log.error("发布消息失败{}", e.getMessage());}log.info("发布消息成功");}}
6.service层
package com.wtzn.web.service;import com.wtzn.common.json.utils.JsonUtils;
import com.wtzn.web.config.MqttProperties;
import com.wtzn.web.domain.bo.Payload;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.Arrays;@Service
@Slf4j
public class MqttService implements MqttCallbackExtended {@Autowiredprivate MqttClient mqttClient;@Autowiredprivate MqttProperties mqttProperties;@PostConstructpublic void init() throws MqttException {mqttClient.setCallback(this);/* mqttClient.subscribe(mqttProperties.getInTopic());log.info("订阅主题{}", mqttProperties.getInTopic());
*/mqttProperties.getInTopics().forEach(x -> {try {mqttClient.subscribe(x.getTopic(), x.getQos());log.info("订阅主题{}", x.getTopic());} catch (MqttException e) {throw new RuntimeException(e);}});}@PreDestroypublic void destroy() throws MqttException {mqttClient.disconnect();log.info("与服务器断开连接");}/*** @description: 发送消息* @param: [message]* @return: void**/public void publish(String topic,int qos,String message) throws MqttException {MqttMessage mqttMessage = new MqttMessage(message.getBytes());mqttMessage.setQos(qos);mqttClient.publish(topic, mqttMessage);log.info("向主题【{}】发布消息:【{}】", topic, message);}/*** @description: 接收消息* @param: [topic, message]* @return: void**/@Overridepublic void messageArrived(String topic, MqttMessage message) throws MqttException {Payload payload = JsonUtils.parseObject(new String(message.getPayload()), Payload.class);log.info("接收到来自【{}】的消息【{}】", topic, payload.getTemperature());/* if (payload.getTemperature() > 37) {publish("发烧");}*/}@Overridepublic void connectionLost(Throwable cause) {log.error("连接丢失:{}", cause.getMessage());}@SneakyThrows@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {if( token!=null ){MqttMessage message = null;try {message = token.getMessage();} catch (MqttException e) {throw new RuntimeException(e);}String topic = token.getTopics()==null ? null : Arrays.asList(token.getTopics()).toString();String str = message==null ? null : new String(message.getPayload());log.info("deliveryComplete: topic={}, message={}", topic, str);} else {log.info("deliveryComplete: null");}log.info("消息已送达");}@Overridepublic void connectComplete(boolean b, String s) {mqttProperties.getInTopics().forEach(x -> {try {mqttClient.subscribe(x.getTopic(), x.getQos());log.info("订阅主题{}", x.getTopic());} catch (MqttException e) {throw new RuntimeException(e);}});}
}
7.dao层
package com.wtzn.web.domain.bo;import lombok.Data;@Data
public class Payload {private Integer temperature;
}