当前位置: 首页 > news >正文

Docker Compose 构建 EMQX 集群 实现mqqt 和websocket

EMQX 集群化管理mqqt真香

目录

#目录 /usr/emqx

容器构建

vim docker-compose.yml

version: '3'services:emqx1:image: emqx:5.8.3container_name: emqx1environment:- "EMQX_NODE_NAME=emqx@node1.emqx.io"- "EMQX_CLUSTER__DISCOVERY_STRATEGY=static"- "EMQX_CLUSTER__STATIC__SEEDS=[emqx@node1.emqx.io,emqx@node2.emqx.io]"healthcheck:test: ["CMD", "/opt/emqx/bin/emqx", "ctl", "status"]interval: 5stimeout: 25sretries: 5networks:emqx-bridge:aliases:- node1.emqx.ioports:- 1883:1883- 8083:8083- 8084:8084- 8883:8883- 18083:18083 # volumes:#   - $PWD/emqx1_data:/opt/emqx/dataemqx2:image: emqx:5.8.3container_name: emqx2environment:- "EMQX_NODE_NAME=emqx@node2.emqx.io"- "EMQX_CLUSTER__DISCOVERY_STRATEGY=static"- "EMQX_CLUSTER__STATIC__SEEDS=[emqx@node1.emqx.io,emqx@node2.emqx.io]"healthcheck:test: ["CMD", "/opt/emqx/bin/emqx", "ctl", "status"]interval: 5stimeout: 25sretries: 5networks:emqx-bridge:aliases:- node2.emqx.io# volumes:#   - $PWD/emqx2_data:/opt/emqx/datanetworks:emqx-bridge:driver: bridge

启动

docker-compose up -d

集群状态

#查看集群状态
docker exec -it emqx1 sh -c "emqx ctl cluster status"#验证
telnet 192.168.0.15 1883
#内网
nc -zv  192.168.0.15 1883 #账户
admin
#默认密码
public

服务开放端口

1883,8083,8084,8883,18083

端口占用

EMQX 默认使用以下端口,请确保这些端口未被其他应用程序占用,并按照需求开放防火墙以保证 EMQX 正常运行。

端口协议描述
1883TCPMQTT over TCP 监听器端口,主要用于未加密的 MQTT 连接。
8883TCPMQTT over SSL/TLS 监听器端口,用于加密的 MQTT 连接。
8083TCPMQTT over WebSocket 监听器端口,使 MQTT 能通过 WebSocket 进行通信。
8084TCPMQTT over WSS (WebSocket over SSL) 监听器端口,提供加密的 WebSocket 连接。
18083HTTPEMQX Dashboard 和 REST API 端口,用于管理控制台和 API 接口。
4370TCPErlang 分布式传输端口,根据节点名称不同实际端口可能是 BasePort (4370) + Offset。
5370TCP集群 RPC 端口(在 Docker 环境下为 5369),根据节点名称不同实际端口可能是 BasePort (5370) + Offset。

前端js示

<!DOCTYPE html>
<html>
<head><title>MQTT WebSocket Test</title><script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script>
</head>
<body><script>// 使用提供的客户端 ID 或生成一个唯一 IDconst clientId = 'emqx_NjI4MT2';// 配置 WebSocket MQTT broker 地址const host = 'ws://127.0.0.1:8083/mqtt';// MQTT 连接选项const options = {keepalive: 60, // 心跳时间间隔clientId: clientId,protocolId: 'MQTT', // 协议 IDprotocolVersion: 5, // 使用 MQTT 5 协议clean: true, // 是否清除会话reconnectPeriod: 1000, // 重连间隔时间 (ms)connectTimeout: 30 * 1000, // 连接超时时间 (ms)username: 'admin', // 设置用户名password: 'public', // 设置密码will: {topic: 'pushRanking/1',payload: 'Connection Closed abnormally..!',qos: 0,retain: false},};console.log('Connecting mqtt client');// 连接到 MQTT Brokerconst client = mqtt.connect(host, options);// 连接成功回调client.on('connect', () => {console.log('Connected to MQTT broker');// 订阅主题 pushRanking/#,支持通配符client.subscribe('pushRanking/1', { qos: 0 }, (err) => {if (!err) {console.log('Subscribed to topic: pushRanking/1');} else {console.error('Failed to subscribe:', err);}});});// 处理接收到的消息client.on('message', (topic, message) => {console.log(`Received message from topic "${topic}": ${message.toString()}`);});// 连接错误回调client.on('error', (err) => {console.log('Connection error:', err);client.end();});// 重新连接回调client.on('reconnect', () => {console.log('Reconnecting...');});// 连接关闭回调client.on('close', () => {console.log('Connection closed');});// 模拟消息发布以测试接收setTimeout(() => {client.publish('pushRanking/1', JSON.stringify({ msg: 'hello' }), { qos: 0 });}, 5000);</script>
</body>
</html>

后端

package emqximport ("fmt"mqtt "github.com/eclipse/paho.mqtt.golang""testing""time"
)func TestMQTT(t *testing.T) {// 创建 EMQX 客户端实例client := NewEMQXClient("tcp://127.0.0.1:1883", "test-client", "admin", "QfTzLy3cop9NOGWj")// 连接到 EMQXif err := client.Connect(); err != nil {fmt.Printf("Failed to connect: %v\n", err)return}defer client.Disconnect()// 订阅主题client.Subscribe("testtopic/#", 1, func(client mqtt.Client, msg mqtt.Message) {fmt.Printf("Message received: %s\n", msg.Payload())})// 发布消息client.Publish("testtopic/1", 1, false, "Hello from Golang!")// 保持连接一段时间以接收消息time.Sleep(10 * time.Second)
}/*长连接的场景 DEMOfunc main() {client := emqxclient.NewEMQXClient("tcp://broker.emqx.io:1883", "test-client", "", "")if err := client.Connect(); err != nil {fmt.Printf("Failed to connect: %v\n", err)return}// 使用 defer 确保程序退出时断开连接defer client.Disconnect()// 订阅主题client.Subscribe("test/topic", 1, func(client mqtt.Client, msg mqtt.Message) {fmt.Printf("Received message: %s\n", msg.Payload())})// 发布消息client.Publish("test/topic", 1, false, "Hello from Golang!")// 捕获退出信号signalChan := make(chan os.Signal, 1)signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM)fmt.Println("Running... Press Ctrl+C to exit.")<-signalChanfmt.Println("Exiting...")
}*//*JS 调用如下
import mqtt from 'mqtt';const brokerURL = 'ws://broker.emqx.io:8083/mqtt'; // WebSocket 连接地址
const clientID = `mqttjs_${Math.random().toString(16).substr(2, 8)}`;// 创建客户端
const client = mqtt.connect(brokerURL, {clientId: clientID,username: '', // 如需要认证,填入用户名password: '', // 如需要认证,填入密码
});// 连接事件
client.on('connect', () => {console.log('Connected to EMQX');// 订阅主题client.subscribe('test/topic', (err) => {if (!err) {console.log('Subscribed to topic: test/topic');} else {console.error('Failed to subscribe:', err);}});// 发布消息client.publish('test/topic', 'Hello from JavaScript!');
});// 接收消息事件
client.on('message', (topic, message) => {console.log(`Received message on topic "${topic}": ${message.toString()}`);
});// 错误事件
client.on('error', (err) => {console.error('Connection error:', err);
});*/

common封装调用

package emqximport ("fmt""time"mqtt "github.com/eclipse/paho.mqtt.golang"
)type EMQXClient struct {client mqtt.Client
}// NewEMQXClient 初始化 EMQX 客户端
func NewEMQXClient(broker string, clientID string, username string, password string) *EMQXClient {opts := mqtt.NewClientOptions().AddBroker(broker).SetClientID(clientID).SetUsername(username).SetPassword(password).SetKeepAlive(60 * time.Second).SetDefaultPublishHandler(func(client mqtt.Client, msg mqtt.Message) {fmt.Printf("Received message on topic: %s, message: %s\n", msg.Topic(), msg.Payload())}).SetPingTimeout(1 * time.Second)client := mqtt.NewClient(opts)return &EMQXClient{client: client}
}// Connect 连接到 EMQX
func (c *EMQXClient) Connect() error {token := c.client.Connect()if token.Wait() && token.Error() != nil {return token.Error()}fmt.Println("Connected to EMQX broker")return nil
}// Publish 发布消息
func (c *EMQXClient) Publish(topic string, qos byte, retained bool, payload interface{}) error {token := c.client.Publish(topic, qos, retained, payload)token.Wait()return token.Error()
}// Subscribe 订阅主题
func (c *EMQXClient) Subscribe(topic string, qos byte, callback mqtt.MessageHandler) error {token := c.client.Subscribe(topic, qos, callback)token.Wait()return token.Error()
}// Unsubscribe 取消订阅
func (c *EMQXClient) Unsubscribe(topics ...string) error {token := c.client.Unsubscribe(topics...)token.Wait()return token.Error()
}// Disconnect 断开连接
func (c *EMQXClient) Disconnect() {c.client.Disconnect(250)fmt.Println("Disconnected from EMQX broker")
}

http://www.lryc.cn/news/513250.html

相关文章:

  • Spring 过滤器:OncePerRequestFilter 应用详解
  • 3.CSS字体属性
  • 微信小程序 单选多选radio/checkbox 纯代码分享
  • k8s 部署meilisearch UI
  • gitlab 还原合并请求
  • ChatGPT最新版本“o3”的概要
  • uniapp——App下载文件,保存、打开文件(二)
  • Postman接口测试05|实战项目笔记
  • 【paddle】初次尝试
  • 01-2023年上半年软件设计师考试java真题解析
  • 一文讲清楚CSS3新特性
  • 系统设计案例:设计 Spotify
  • 太速科技-633-4通道2Gsps 14bit AD采集PCie卡
  • 图片叠加拖拽对比展示效果实现——Vue版
  • 结合长短期记忆网络(LSTM)和无迹卡尔曼滤波器(UKF)的技术在机器人导航和状态估计中的应用前景
  • 【MATLAB APP Designer】小波阈值去噪(第一期)
  • ClickHouse副本搭建
  • K3知识点
  • cocos creator 3.x版本如何添加打开游戏时首屏加载进度条
  • Fama MacBeth两步法与多因子模型的回归检验
  • IDEA 搭建 SpringBoot 项目之配置 Maven
  • node.js之---事件循环机制
  • Python OpenAI 库开发指南:从入门到实战精通
  • flash-attention保姆级安装教程
  • 送给一年编程道路的自己
  • LeRobot(1)
  • C++ 设计模式:组合模式(Composite Pattern)
  • OpenHarmony源码编译后烧录镜像教程,RK3566鸿蒙开发板演示
  • 强化学习(1)
  • 【漏洞复现】金和OA C6 FileDownLoad.aspx 任意文件读取漏洞复现