【物联网】mqtt初体验
文章目录
- 安装EMQX
- java集成
- 添加依赖
- mqtt配置参数
- 发布组件
- 订阅组件
- 测试接口
- 接口测试
最近在了解物联网云平台方面的知识,解除了mqtt协议,只看书籍难免有些枯燥,所以直接试验一下,便于巩固理论知识。
broker服务器操作系统:centos7
broker服务程序:EMQX
虚拟机IP地址:192.168.89.82
安装EMQX
在自己的虚拟机环境下进行安装,按照EMQX官方资料操作即可,步骤如下:
[root@centos7-82 ~]# cd /usr/src
[root@centos7-82 src]# wget https://www.emqx.com/zh/downloads/broker/5.0.17/emqx-5.0.17-el7-amd64.tar.gz
[root@centos7-82 src]# mkdir -p emqx
[root@centos7-82 src]# tar -zxvf emqx-5.0.17-el7-amd64.tar.gz -C emqx
[root@centos7-82 src]# ./emqx/bin/emqx start
启动后,控制台日志如下:
可以看下emqx端口:
浏览器访问地址如下:http://192.168.89.82:18083/ 默认的用户名密码为admin、public,第一次登录后会首先要求修改密码。
java集成
为了快速体验,直接一个springboot工程里既有发布客户端也有订阅客户端。
添加依赖
<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version></dependency>
mqtt配置参数
mqtt.broker=tcp://192.168.89.82:1883
mqtt.username=admin
mqtt.password=admin
发布组件
@Component
public class PublishSample {private static final Logger log = LoggerFactory.getLogger(PublishSample.class);@Autowiredpublic MqttPropertiesConfig mqttPropertiesConfig;public void sentMsg(String content, String clientId, String topic, int qos){try {MqttClient mqttClient = new MqttClient(mqttPropertiesConfig.getBroker(), clientId, new MemoryPersistence());// 连接参数MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();// 设置用户名和密码mqttConnectOptions.setUserName(mqttPropertiesConfig.getUsername());mqttConnectOptions.setPassword(mqttPropertiesConfig.getPassword().toCharArray());mqttConnectOptions.setConnectionTimeout(60);mqttConnectOptions.setKeepAliveInterval(60);// 连接mqttClient.connect(mqttConnectOptions);// 创建消息并设置 QoSMqttMessage message = new MqttMessage(content.getBytes());message.setQos(qos);// 发布消息mqttClient.publish(topic, message);log.info("Message published");log.info("topic: {}", topic);log.info("message content: {}", content);// 关闭连接mqttClient.disconnect();// 关闭客户端mqttClient.close();} catch (MqttException e) {throw new RuntimeException(e);}}}
订阅组件
@Component
public class SubscribeSample {private static final Logger log = LoggerFactory.getLogger(SubscribeSample.class);@Autowiredpublic MqttPropertiesConfig mqttPropertiesConfig;public void subTest(String clientId, String topic, int qos){try {MqttClient client = new MqttClient(mqttPropertiesConfig.getBroker(), clientId, new MemoryPersistence());// 连接参数MqttConnectOptions options = new MqttConnectOptions();options.setUserName(mqttPropertiesConfig.getUsername());options.setPassword(mqttPropertiesConfig.getPassword().toCharArray());options.setConnectionTimeout(60);options.setKeepAliveInterval(60);// 设置回调client.setCallback(new MqttCallback() {public void connectionLost(Throwable cause) {log.info("connectionLost: {}", cause.getMessage());}public void messageArrived(String topic, MqttMessage message) {log.info("topic: {}", topic);log.info("Qos: {}", message.getQos());log.info("message content: {}", new String(message.getPayload()));}public void deliveryComplete(IMqttDeliveryToken token) {log.info("deliveryComplete---------{}", token.isComplete());}});client.connect(options);client.subscribe(topic, qos);} catch (Exception e) {e.printStackTrace();}}
}
测试接口
@RestController
public class TestController {private static final Logger log = LoggerFactory.getLogger(TestController.class);@Autowiredpublic PublishSample publishSample;@Autowiredpublic SubscribeSample subscribeSample;/*** 发布接口* @param content* @return*/@RequestMapping("sent")public String sent(String content){String clientId = "lizx_pub_client1";int qos = 0;String topic = "mqtt/test";log.info("sent");publishSample.sentMsg(content, clientId, topic, qos);return "success";}/*** 模拟订阅客户端1* @return*/@RequestMapping("sub1")public String sub1(){String clientId = "lizx_sub_client1";int qos = 0;String topic = "mqtt/test";log.info("sub1");subscribeSample.subTest(clientId, topic, qos);return "success";}/*** 模拟订阅客户端2* @return*/@RequestMapping("sub2")public String sub2(){String clientId = "lizx_sub_client2";int qos = 0;String topic = "mqtt/test";log.info("sub2");subscribeSample.subTest(clientId, topic, qos);return "success";}
}
接口测试
直接简单浏览器两个标签页分别输入:
http://127.0.0.1:8080/sub1
http://127.0.0.1:8080/sub2
然后再打开一个标签页输入:
http://127.0.0.1:8080/sent?content=Hello%20MQTT
后台日志如下:
2023-02-14 16:19:39.970 INFO 5708 --- [nio-8080-exec-1] com.lizx.emqx.client.web.TestController : sub1
2023-02-14 16:19:46.651 INFO 5708 --- [nio-8080-exec-2] com.lizx.emqx.client.web.TestController : sub2
2023-02-14 16:19:50.384 INFO 5708 --- [nio-8080-exec-3] com.lizx.emqx.client.web.TestController : sent
2023-02-14 16:19:50.697 INFO 5708 --- [nio-8080-exec-3] c.lizx.emqx.client.sample.PublishSample : Message published
2023-02-14 16:19:50.698 INFO 5708 --- [nio-8080-exec-3] c.lizx.emqx.client.sample.PublishSample : topic: mqtt/test
2023-02-14 16:19:50.703 INFO 5708 --- [izx_sub_client2] c.l.emqx.client.sample.SubscribeSample : topic: mqtt/test
2023-02-14 16:19:50.703 INFO 5708 --- [izx_sub_client1] c.l.emqx.client.sample.SubscribeSample : topic: mqtt/test
2023-02-14 16:19:50.705 INFO 5708 --- [nio-8080-exec-3] c.lizx.emqx.client.sample.PublishSample : message content: Hello MQTT
2023-02-14 16:19:50.705 INFO 5708 --- [izx_sub_client2] c.l.emqx.client.sample.SubscribeSample : Qos: 0
2023-02-14 16:19:50.705 INFO 5708 --- [izx_sub_client1] c.l.emqx.client.sample.SubscribeSample : Qos: 0
2023-02-14 16:19:50.706 INFO 5708 --- [izx_sub_client2] c.l.emqx.client.sample.SubscribeSample : message content: Hello MQTT
2023-02-14 16:19:50.706 INFO 5708 --- [izx_sub_client1] c.l.emqx.client.sample.SubscribeSample : message content: Hello MQTT