当前位置: 首页 > news >正文

Java连接Emqx实现订阅发布消息

一:前提

        安装了Emqx开源版、MQTTX客户端

二:订阅发布实现步骤

1.引入依赖 

<!--MQTT客户端-->
<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.2</version>
</dependency>

2.编辑配置文件

mqtt:broker:uri: tcp://127.0.0.1:31883client:id: mqtt-am-client-${random.uuid}# 订阅主题配置(支持多个)inTopics:- topic: test/topic1qos: 0- topic: test/topic2qos: 1- topic: test/topic3qos: 2# 发布主题配置(支持多个)outTopics:- topic: out/topic1qos: 0username: ampassword: LGyPtuAB4th5pkeepAliveInterval: 60

3.读取配置文件

package com.wtzn.web.config;import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;import java.util.List;@Configuration
@ConfigurationProperties(prefix = "mqtt")
@Data
public class MqttProperties {private Broker broker;private Client client;private List<TopicConfig> inTopics;private List<TopicConfig> outTopics;private String userName;private String password;private int KeepAliveInterval;@Datapublic static class Broker {private String uri;}@Datapublic static class Client {private String id;}@Datapublic static class TopicConfig {private String topic;private int qos;}}

4.创建Mqtt客户端

package com.wtzn.web.config;import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class MqttConfig {@Autowiredprivate MqttProperties mqttProperties;@Beanpublic MqttClient mqttClient() throws MqttException {MqttClient client = new MqttClient(mqttProperties.getBroker().getUri(), mqttProperties.getClient().getId(), new MemoryPersistence());MqttConnectOptions options = new MqttConnectOptions();// 此客户端的用户名和密码options.setUserName(mqttProperties.getUserName());options.setPassword(mqttProperties.getPassword().toCharArray());options.setCleanSession(true);// 设置遗嘱消息//  options.setWill(mqttProperties.getOutTopic(), "我是mqtt-am-client,我已下线,这是我的遗嘱".getBytes(), 2, true);// 连接超时重试options.setConnectionTimeout(5000); //毫秒options.setKeepAliveInterval(mqttProperties.getKeepAliveInterval());options.setAutomaticReconnect(true);//网络中断重连client.connect(options);return client;}
}

5.controller层

package com.wtzn.web.controller;import cn.dev33.satoken.annotation.SaIgnore;
import com.wtzn.common.json.utils.JsonUtils;
import com.wtzn.web.domain.bo.Payload;
import com.wtzn.web.service.MqttService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;import java.util.LinkedList;@RestController
@Slf4j
@RequestMapping("/mqtt")
public class MqttController {@Autowiredprivate MqttService mqttService;@SaIgnore@PostMapping("/mqtt")public void publish() {try {//  LinkedList<Payload> payloadLinkedList=new LinkedList<>();for(int i=1; i<=10000; i++){Payload payload=new Payload();payload.setTemperature(i);//  payloadLinkedList.add(payload);mqttService.publish("test/topic1",0,JsonUtils.toJsonString(payload));}} catch (MqttException e) {log.error("发布消息失败{}", e.getMessage());}log.info("发布消息成功");}}

6.service层

package com.wtzn.web.service;import com.wtzn.common.json.utils.JsonUtils;
import com.wtzn.web.config.MqttProperties;
import com.wtzn.web.domain.bo.Payload;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.Arrays;@Service
@Slf4j
public class MqttService implements MqttCallbackExtended {@Autowiredprivate MqttClient mqttClient;@Autowiredprivate MqttProperties mqttProperties;@PostConstructpublic void init() throws MqttException {mqttClient.setCallback(this);/*       mqttClient.subscribe(mqttProperties.getInTopic());log.info("订阅主题{}", mqttProperties.getInTopic());
*/mqttProperties.getInTopics().forEach(x -> {try {mqttClient.subscribe(x.getTopic(), x.getQos());log.info("订阅主题{}", x.getTopic());} catch (MqttException e) {throw new RuntimeException(e);}});}@PreDestroypublic void destroy() throws MqttException {mqttClient.disconnect();log.info("与服务器断开连接");}/*** @description: 发送消息* @param: [message]* @return: void**/public void publish(String topic,int qos,String message) throws MqttException {MqttMessage mqttMessage = new MqttMessage(message.getBytes());mqttMessage.setQos(qos);mqttClient.publish(topic, mqttMessage);log.info("向主题【{}】发布消息:【{}】", topic, message);}/*** @description: 接收消息* @param: [topic, message]* @return: void**/@Overridepublic void messageArrived(String topic, MqttMessage message) throws MqttException {Payload payload = JsonUtils.parseObject(new String(message.getPayload()), Payload.class);log.info("接收到来自【{}】的消息【{}】", topic, payload.getTemperature());/*  if (payload.getTemperature() > 37) {publish("发烧");}*/}@Overridepublic void connectionLost(Throwable cause) {log.error("连接丢失:{}", cause.getMessage());}@SneakyThrows@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {if( token!=null ){MqttMessage message = null;try {message = token.getMessage();} catch (MqttException e) {throw new RuntimeException(e);}String topic = token.getTopics()==null ? null : Arrays.asList(token.getTopics()).toString();String str = message==null ? null : new String(message.getPayload());log.info("deliveryComplete: topic={}, message={}", topic, str);} else {log.info("deliveryComplete: null");}log.info("消息已送达");}@Overridepublic void connectComplete(boolean b, String s) {mqttProperties.getInTopics().forEach(x -> {try {mqttClient.subscribe(x.getTopic(), x.getQos());log.info("订阅主题{}", x.getTopic());} catch (MqttException e) {throw new RuntimeException(e);}});}
}

7.dao层

package com.wtzn.web.domain.bo;import lombok.Data;@Data
public class Payload {private Integer temperature;
}

三:测试

1.PostMan直接调用测试

2、下载MQTTX客户端进行测试

http://www.lryc.cn/news/582502.html

相关文章:

  • 恒创科技:香港站群服务器做seo站群优化效果如何
  • ReactNative【实战】瀑布流布局列表(含图片自适应、点亮红心动画)
  • Rust DevOps框架管理实例
  • ffmpeg下编译tsan
  • iOS 性能测试工具全流程:主流工具实战对比与适用场景
  • cocos2dx3.x项目升级到xcode15以上的iconv与duplicate symbols报错问题
  • CSP-S模拟赛二总结(实际难度大于CSP-S)
  • 力扣 239 题:滑动窗口最大值的两种高效解法
  • Android kotlin 协程的详细使用指南
  • C++--AVL树
  • 微前端框架对比
  • (16)Java+Playwright自动化测试-iframe操作-监听事件和执行js脚本
  • 精益管理与数字化转型的融合:中小制造企业降本增效的双重引擎
  • Nexus zkVM 3.0 及未来:迈向模块化、分布式的零知识证明
  • 生成PDF文件(基于 iText PDF )
  • Android framework修改解决偶发开机时有两个launcher入口的情况
  • Prompt Injection Attack to Tool Selection in LLM Agents
  • 论文略读:Prefix-Tuning: Optimizing Continuous Prompts for Generation
  • C++11标准库算法:深入理解std::find, std::find_if与std::find_if_not
  • Python中os.path和pathlib模块路径操作函数汇总
  • react的条件渲染【简约风5min】
  • C#使用Semantic Kernel实现Embedding功能
  • 【知足常乐ai笔记】机器人强化学习
  • TVS管工作原理是什么?主要的应用场景都有哪些?
  • MySQL数据库访问(C/C++)
  • 赛博威破解快消品渠道营销三重困局,助力企业实现“活动即战力”
  • 小米YU7预售现象深度解析:智能电动汽车的下一个范式革命
  • 内容页模板表格显示不全的问题处理
  • IP 能ping通,服务器是否开机?
  • 第8章:应用层协议HTTP、SDN软件定义网络、组播技术、QoS