当前位置: 首页 > news >正文

MQTT与服务器通讯

1. MQTT 3.1.1 概览

  • 协议定位:轻量级发布/订阅(Pub/Sub),长连接、低带宽、弱网络友好。

  • 角色

    • Broker(服务器):转发中心。

    • Client(客户端):发布者/订阅者(设备/APP/服务端)。

  • 连接:TCP(常见端口 1883),TLS 时 8883;协议名 "MQTT"、协议级别 4(=3.1.1)。

  • 消息模型Client PUBLISH → Broker → (根据订阅) → Clients

  • 核心特性:Topic 层级命名、QoS(0/1/2)、遗嘱(LWT)、保留消息(Retain)、会话(Clean Session/持久会话)、Keep Alive。


2. 主题(Topic)与通配符(非常重要)

  • Topic 示例
    factory/a101/dev/001/telemetry/temperature

  • **Topic Filter(订阅过滤器)**支持通配符,发布主题不允许通配符

    • +:匹配单层factory/+/dev/001/telemetry/#

    • #:匹配多层且只能放在最后一层factory/a101/#

  • 命名规范要点

    • 区分大小写;/分层;UTF-8 文本,不含 U+0000

    • 主题是字节序列,但默认视作 UTF-8;不同平台可能对非法 UTF-8 严格。

  • 最佳实践:用明确层级表达设备/功能/方向:

    • 上报:prod/<pid>/dev/<did>/up/prop

    • 下发:prod/<pid>/dev/<did>/down/cmd

:把 # 放中间、在 PUBLISH 里用 +/#、或大小写错位,都会直接导致“订阅不到/路由错”。


3. QoS 语义(0/1/2)与完整握手

  • QoS 0(最多一次):不确认,丢了就丢,最低延迟、最低开销。

  • QoS 1(至少一次)PUBLISHPUBACK;可能重复到达(需要去重)。

  • QoS 2(正好一次):四步握手 PUBLISH→PUBREC→PUBREL→PUBCOMP,最重最稳,延迟高。

序保证同一客户端同一订阅的同一 Topic 上,Broker 需按发送顺序投递(不同 Topic 间不保证顺序)。

注意点

  • QoS>0 的消息必须携带 Packet Identifier(16位,在“同一方向的在途”要唯一)。

  • 断线重连要重发未确认消息:重发时需置 DUP=1(库一般帮你做)。


4. 保留消息(Retain)

  • 发布时设置 retain=1,Broker 会把“该 Topic 的最后一条”存为保留消息,新订阅者一订阅立即收到当前值。

  • 常用于最新状态/配置

  • 清除方法:向相同 Topic 发一条空载荷+retain=1

  • 常见坑
    1)把一次性的“报警”也设成 retain,后来订阅的人会一直误收到旧报警。
    2)忘记清除,导致 UI 显示老状态。


5. 遗嘱(LWT, Last Will and Testament)

  • CONNECT 里声明:遗嘱主题 / 负载 / QoS / retain。

  • 当客户端非正常下线(如掉线、超时、被 Broker 断开)时,Broker 自动发布遗嘱

  • 常用模式:

    • 遗嘱:status/<dev> = {"online":false}retain=1

    • 上线后手动发布:status/<dev> = {"online":true}retain=1,覆盖遗嘱)

注意正常 DISCONNECT 不会触发遗嘱


6. 会话与 Clean Session

  • Clean Session=1(无状态/临时会话):断线后 Broker 不保留订阅和未投递消息;重连需重新 SUBSCRIBE

  • Clean Session=0(持久会话):Broker 记住订阅,QoS1/2 未投递消息会排队;重连后继续下发。

  • CONNACK.Session Present:服务端告知“是否存在旧会话”。

  • 空 ClientID(3.1.1):只有在 Clean Session=1 时服务端可以分配随机 ClientID(很多平台禁用,最好自己给唯一 ID)。

:Clean Session=1 却指望离线保留消息;或 ClientID 非唯一导致“前一个会话被顶掉”。


7. Keep Alive(心跳)与 PING

  • 客户端在 CONNECT 里声明秒数(如 60s)。在一个 KeepAlive 周期内必须向 Broker 发任意控制报文(PUBLISH 也算)——否则就发 PINGREQ

  • Broker 回 PINGRESP;超时会断开连接。

  • Keep Alive=0 表示不使用心跳(不建议)。

  • 嵌入式里用 RTOS 软件定时器滴答任务管理心跳更可靠。


8. 安全(3.1.1 本身不规定,但实际必须)

  • TLS(端口 8883)+ 证书校验(CA/服务端证书),避免明文泄露用户名/密码。

  • 用户名/密码在 CONNECT 里明文(无 TLS)。

  • 物联网平台(如 OneNET)通常有产品ID/设备名/鉴权约定:放在 ClientIDUsernamePassword 中的某些组合(按平台文档来)。

:只用 1883 明文 + 固定密码;时钟不准导致 TLS 校验失败(要先 SNTP)。


9. 控制报文(你会经常看到的)

  • 连接类:CONNECTCONNACKDISCONNECT

  • 数据类:PUBLISH(带 DUP/QoS/RETAIN 标志位)

  • 确认类:PUBACK/PUBREC/PUBREL/PUBCOMP

  • 订阅类:SUBSCRIBESUBACKUNSUBSCRIBEUNSUBACK

  • 心跳类:PINGREQPINGRESP

细节提醒SUBSCRIBE/UNSUBSCRIBE 的固定报头 QoS 位必须是 1(协议要求,库已处理)。


10. 负载(Payload)

  • 二进制透明;JSON/CSV/Protobuf/CBOR 都行。

  • 物模型/平台往往规定 JSON 格式;注意编码(UTF-8)与长度限制(很多 Broker 有最大报文长度/速率限制)。


11. 常见坑 & 设计清单

  1. 订阅过滤器写错:# 不在末尾、在 PUBLISH 里用了通配符。

  2. Retain 滥用,导致“鬼影状态”。

  3. ClientID 重复 → 老连接被踢;或被平台限长/限字符。

  4. Clean Session 用错:以为能离线堆积。

  5. QoS 1 重复消息未去重(以 PacketId + 业务消息ID 做幂等)。

  6. KeepAlive 太短/网络抖动大 → 频繁掉线;太长 → 故障发现慢。

  7. 未正确处理“重连 + 重新订阅”(Clean=1)或“老会话恢复”(Clean=0)。

  8. 遗嘱未用/未 retain,云端很难判断设备在线离线。

  9. 在 ISR 里做网络/内存重操作;或单个环形缓冲区上下文不安全。

  10. TLS 校验失败(没装 CA、时间错、SNI 不对)。

12. 一页速查(Cheat-Sheet)

  • 协议级别:4;控制报文:CONNECT/CONNACKPUBLISH(+PUBACK|PUBREC|PUBREL|PUBCOMP)、SUBSCRIBE/SUBACKUNSUBSCRIBE/UNSUBACKPINGREQ/PINGRESPDISCONNECT

  • QoS:0=最多一次1=至少一次(去重),2=正好一次(最稳最慢)

  • 遗嘱:非正常下线才触发;与“上线保留消息”配合用于在线状态

  • Retain:存“最后一条”,空载荷+retain 置空

  • Clean Session:1=临时(重连需重订阅),0=持久(可离线堆积 QoS1/2)

  • Keep Alive:建议 30~120s;心跳失败会断开

  • Topic:通配符只在订阅里;+ 单层,# 多层且末尾

报文

固定报头(Fixed Header)

  • 第1字节:高4位=报文类型,低4位=标志位(不同类型要求不同)

    • 例:SUBSCRIBE 的低4位必须是 0010b → 所以首字节 = 1000 0010b = 0x82

  • 剩余长度(Remaining Length):可变长度整型(VarInt,1~4字节),=“可变报头 + 负载”的总字节数

    • 编码:每字节低7位存数值,高1位为“后续字节标志”。

      • 例:321 → 321 % 128 = 65 → 65|0x80=0xC1;321/128=2(最后一字节)→ 0x02
        0xC1 0x02

变长字符串(UTF-8 Encoded String)

  • 2字节长度(大端) + 字节串

    • 例:"MQTT" → 长度4 = 00 04,后跟 4D 51 54 54

16位标识(Packet Identifier)

  • 大端:高字节在前

    • 例:0x002A00 2A


1)CONNECT(最小可用示例)

需求CleanSession=1KeepAlive=60,ClientID="prodA_dev001";无用户名/密码,无遗嘱(便于看清结构)。

计算

  • 固定报头首字节:CONNECT=类型1 → 0001 << 4 = 0x10;flag 必须 00000x10

  • 可变报头:

    • 协议名 "MQTT"00 04 4D 51 54 54(6字节)

    • 协议级别:04

    • 连接标志(Connect Flags):0000 0010b(CleanSession=1)→ 0x02

    • KeepAlive=60 → 00 3C

    • 小计= 6 + 1 + 1 + 2 = 10

  • 负载:

    • ClientID "prodA_dev001" 长度=12 → 00 0C + (70 72 6F 64 41 5F 64 65 76 30 30 31)

    • 小计= 14

  • 剩余长度 = 10 + 14 = 24 → 单字节 VarInt 0x18

完整HEX

10 18                                     # 固定报头: CONNECT, 剩余长度=0x18(24)
00 04 4D 51 54 54                         # 协议名 "MQTT"
04                                        # 协议级别=4 (MQTT 3.1.1)
02                                        # 连接标志: CleanSession=1
00 3C                                     # KeepAlive=60
00 0C 70 72 6F 64 41 5F 64 65 76 30 30 31 # ClientID "prodA_dev001"

若加遗嘱/用户名/密码:连接标志位相应置1,并在负载尾部追加对应的“长度+字符串”。

2)CONNACK(服务端回应)

最小成功示例:Session Present=0Return Code=0x00(成功)

20 02     # 固定报头: CONNACK(0x20), 剩余长度=2
00        # Session Present=0
00        # ReturnCode=0x00 (Accepted)

3)SUBSCRIBE(重点:固定报头的 QoS 位=1)

需求:订阅 prodA/dev001/down/#,请求QoS=1,报文标识 0x0010

计算

  • 首字节:类型8(SUBSCRIBE)→ 1000 <<4 = 0x80flags=00100x82

  • 可变报头:PacketId 00 10

  • 负载:TopicFilter 以UTF-8字符串编码 + 每个后面1字节请求QoS

    • 字符串 "prodA/dev001/down/#" 长度=1900 13
      ASCII:70 72 6F 64 41 2F 64 65 76 30 30 31 2F 64 6F 77 6E 2F 23

    • 请求QoS = 01

  • 剩余长度 = 2(PacketId) + (2+19+1) = 240x18

完整HEX

82 18
00 10
00 13 70 72 6F 64 41 2F 64 65 76 30 30 31 2F 64 6F 77 6E 2F 23 01

为什么首字节=0x82?
SUBSCRIBE 类型=8 → 高4位 1000b=0x8;固定要求 flags=0010b=0x2 → 合并成 0x82

4)SUBACK

假设同一 PacketId,且服务端同意 QoS1

90 03     # SUBACK, 剩余长度=3
00 10     # PacketId
01        # 返回码: 0x00/0x01/0x02代表授予的QoS0/1/2;0x80=失败

5)PUBLISH(QoS1 示例)

需求topic="prodA/dev001/up/temperature",载荷 {"temp":23.5},QoS=1,非保留、非DUP。PacketId=0x002A。

计算

  • 首字节:类型3 → 基数 0x30;QoS1 → +0x02;retain=0;dup=0 → 0x32

  • 可变报头:

    • TopicName 长度:
      "prodA/dev001/up/temperature"
      = 5 +1 +6 +1 +2 +1 +11 = 2700 1B
      后跟ASCII:70 72 6F 64 41 2F 64 65 76 30 30 31 2F 75 70 2F 74 65 6D 70 65 72 61 74 75 72 65

    • 因为 QoS1,要有 PacketId00 2A

    • 小计= 2+27 + 2 = 31

  • 负载:{"temp":23.5} 字节数=137B 22 74 65 6D 70 22 3A 32 33 2E 35 7D

  • 剩余长度 = 31 + 13 = 440x2C

完整HEX

32 2C
00 1B 70 72 6F 64 41 2F 64 65 76 30 30 31 2F 75 70 2F 74 65 6D 70 65 72 61 74 75 72 65
00 2A
7B 22 74 65 6D 70 22 3A 32 33 2E 35 7D

注意:QoS1 的服务端应答 PUBACK(见下一节)。QoS0 无 PacketId,也不会有 PUBACK。

6)PUBACK(对应上面的 PUBLISH QoS1)

40 02   # PUBACK, 剩余长度=2
00 2A   # PacketId

7)UNSUBSCRIBE / UNSUBACK(同样 flags=0010)

需求:退订同一过滤器,PacketId=0x0011。

计算

  • 首字节:类型10 → 0xA0;flags 00100xA2

  • 可变报头:00 11

  • 负载:仅 TopicFilter( QoS字节)
    "prodA/dev001/down/#" 长度=19 → 00 13 + (ASCII同上)

  • 剩余长度= 2 + (2+19) = 230x17

完整HEX

A2 17
00 11
00 13 70 72 6F 64 41 2F 64 65 76 30 30 31 2F 64 6F 77 6E 2F 23

UNSUBACK

B0 02
00 11

8)PINGREQ / PINGRESP、DISCONNECT(最简单)

# PINGREQ
C0 00# PINGRESP
D0 00# DISCONNECT
E0 00

9)保留消息(Retain)示例(对比看首字节)

需求:状态主题 status/prodA/dev001,载荷 {"online":true},QoS1,retain=1,PacketId=0x002B。

  • 首字节:类型3(0x30) + QoS1(0x02) + Retain(0x01) = 0x33

  • Topic 长度 "status/prodA/dev001" = 6+1+5+1+6=1900 13

  • 变头小计=2+19 +2=23;载荷 {"online":true} = 15

  • 剩余长度=23+15=380x26

完整HEX(关键是首字节0x33)

33 26
00 13 73 74 61 74 75 73 2F 70 72 6F 64 41 2F 64 65 76 30 30 31
00 2B
7B 22 6F 6E 6C 69 6E 65 22 3A 74 72 75 65 7D

10)如何“算得很准”:通用打包小函数(C思路)

// 变长整型编码(Remaining Length)
int encode_varint(unsigned char *buf, int value) {int i = 0;do {unsigned char byte = value % 128;value /= 128;if (value > 0) byte |= 0x80;  // 置后续位buf[i++] = byte;} while (value > 0);return i; // 返回写入字节数
}// 写UTF-8字符串: 2字节长度(大端) + 数据
int encode_utf8(unsigned char *buf, const char *s) {int n = (int)strlen(s);buf[0] = (n >> 8) & 0xFF;buf[1] = n & 0xFF;memcpy(buf+2, s, n);return 2 + n;
}// 写16位大端
static inline void put_u16(unsigned char *buf, uint16_t v) {buf[0] = (v >> 8) & 0xFF; buf[1] = v & 0xFF;
}

用这些基础砖块,就能“机械式”拼出上面所有报文:先拼可变报头+负载 → 算剩余长度 → 最后写固定报头首字节

11)容易错的点(再次确认)

  • SUBSCRIBE/UNSUBSCRIBE 固定报头标志=0010,不是 0000!否则 Broker 直接判错。

  • Remaining Length可变报头+负载之和,不包含固定报头自身。

  • 字符串均是 “2字节长度(大端) + 字节串”,不是 C 风格 \0 结尾。

  • QoS1/2 才有 Packet Identifier,QoS0 没有。

  • PUBLISH 首字节低四位:DUP(位3) | QoS(位1..2) | RETAIN(位0),组合别写反。

  • 大小端:全按网络字节序(大端)

  • Payload 是二进制透明的,不会自动加引号或\0

STM32 + 温湿度传感器 → 通过 ESP8266(WiFi 模块)→ 使用 MQTT 3.1.1 协议上传数据到 OneNET 平台。

main.c

#include "sys.h"        // 系统/时钟等基础配置头文件
#include "delay.h"      // 毫秒/微秒延时
#include "led.h"        // 板载LED控制
#include "uart1.h"      // USART1 打印调试
#include "dht11.h"      // DHT11 传感器采集接口
#include "esp8266.h"    // ESP8266 AT 驱动(透传/TCP)
#include "onenet.h"     // MQTT 打包/收发(面向 OneNET)int main(void)
{HAL_Init();                         /* 初始化 HAL 库(复位外设、Flash 接口、Systick) */stm32_clock_init(RCC_PLL_MUL9);     /* 设置系统时钟到 72MHz(HSE→PLL×9) */led_init();                         /* 初始化 LED GPIO(用于状态指示) */uart1_init(115200);                 /* 初始化 USART1=115200,用于打印调试 */esp8266_init(115200);               /* 初始化 ESP8266 串口与联网流程(STA 连接/AP、TCP、透传) */printf("hello world!\r\n");         /* 打印上电提示 */printf("MQTT初始化...\r\n");        /* 提示:即将初始化 MQTT 缓冲与登录参数 */mqtt_init();                        /* 初始化 MQTT 缓冲、登录参数,并发送一次 DISCONNECT 清理状态 */printf("MQTT连接...\r\n");          /* 提示:即将发起 MQTT CONNECT */mqtt_connect(MQTT_ClientID,         /* ClientID(在 onenet.c/mqtt_login_init 中填充) */MQTT_UserName,         /* Username */MQTT_PassWord);        /* Password */uint8_t data_send[512] = {0};       /* 上报 JSON 文本缓冲(发布载荷) */uint8_t dht11_data[4] = {0};        /* DHT11 原始数据:湿度整数/小数 + 温度整数/小数 */while(1){memset(dht11_data, 0, 4);               /* 清空传感器数据缓存 */dht11_read(dht11_data);                 /* 触发 DHT11 采集(阻塞式时序) *//* 组织 OneNET 物模型兼容的属性上报 JSON注意:%d.%d 拼接小数(DHT11 的 1/10 精度),负温度传感器不适用该写法 */sprintf((char *)data_send,"{\"id\":\"1386772172\",\"version\":\"1.0\",\"params\":{""\"CurrentTemperature\":{\"value\":%d.%d},""\"CurrentHumidity\":{\"value\":%d.%d}}}",dht11_data[2], dht11_data[3],   /* 温度 整数.小数 */dht11_data[0], dht11_data[1]);  /* 湿度 整数.小数 *//* 发布到属性上报主题(QoS=0)注意:若要云端可靠到达/回执,建议用 QoS=1,并处理 PUBACK */mqtt_publish_data(POST_TOPIC, (char *)data_send, 0);delay_ms(3000);                         /* 周期 3s 上报一次(仅示例) */printf("\r\n~~~~~~~~~~~~~~~~~发送心跳包~~~~~~~~~~~~~~~~~\r\n");mqtt_send_heart();                       /* 发送 PINGREQ(C0 00),用于维持 KeepAlive */printf("\r\n~~~~~~~~~~~~~~~~~发送心跳包结束~~~~~~~~~~~~~~~~~\r\n");}
}

dht11.h

#ifndef __DHT11_H__
#define __DHT11_H__#include "sys.h"  // 包含 HAL/GPIO 等基础类型定义/* DHT11 连接到 GPIOA.5 引脚(需根据实际硬件改) */
#define DHT11_PORT          GPIOA
#define DHT11_PIN           GPIO_PIN_5
#define DHT11_CLK_ENABLE()  __HAL_RCC_GPIOA_CLK_ENABLE()  // 使能 GPIOA 时钟/* 单线协议的输出控制:DHT11_DQ_OUT(x):输出电平;x为1输出高电平,0输出低电平 */
#define DHT11_DQ_OUT(x)     do{ x ? \HAL_GPIO_WritePin(DHT11_PORT, DHT11_PIN, GPIO_PIN_SET) : \HAL_GPIO_WritePin(DHT11_PORT, DHT11_PIN, GPIO_PIN_RESET);\}while(0)/* 读取引脚电平:返回 GPIO_PIN_SET 或 GPIO_PIN_RESET */
#define DHT11_DQ_IN         HAL_GPIO_ReadPin(DHT11_PORT, DHT11_PIN)/* 采集函数:读取 4 字节有效数据(湿度整数/小数、温度整数/小数),校验在 .c 中完成 */
void dht11_read(uint8_t *result);#endif

dht11.c

#include "dht11.h"   // 本模块头文件
#include "delay.h"   // 微秒/毫秒延时
#include "string.h"  // memcpy
#include "stdio.h"   // printf/* DHT11 接收的 5 字节原始数据:H_int, H_dec, T_int, T_dec, Checksum这里用 char 定义,但更建议用 uint8_t(无符号),以免负数扩展问题 */
char dht11_data[5] = {0};/* 将引脚配置为输入模式:DHT11 数据线释放后由传感器驱动 */
void dht11_gpio_input(void)
{GPIO_InitTypeDef gpio_initstruct;DHT11_CLK_ENABLE();                          /* 使能端口时钟 */gpio_initstruct.Pin = DHT11_PIN;             /* 选择 DHT11 所在引脚 */gpio_initstruct.Mode = GPIO_MODE_INPUT;      /* 输入模式(浮空) */gpio_initstruct.Speed = GPIO_SPEED_FREQ_HIGH;/* 输出速度字段此处无效,可忽略 */HAL_GPIO_Init(DHT11_PORT, &gpio_initstruct); /* 初始化 GPIO */
}/* 将引脚配置为推挽输出:用于主机拉低起始信号 */
void dht11_gpio_output(void)
{GPIO_InitTypeDef gpio_initstruct;DHT11_CLK_ENABLE();                          /* 使能端口时钟 */gpio_initstruct.Pin = DHT11_PIN;             /* 选择 DHT11 所在引脚 */gpio_initstruct.Mode = GPIO_MODE_OUTPUT_PP;  /* 推挽输出 */gpio_initstruct.Speed = GPIO_SPEED_FREQ_HIGH;/* 高速(影响上升沿) */HAL_GPIO_Init(DHT11_PORT, &gpio_initstruct); /* 初始化 GPIO */
}/* 起始时序:主机拉低约 >=18ms,随后拉高并切换为输入,等待 DHT11 的响应脉冲 */
void dht11_start(void)
{dht11_gpio_output();   /* 切换为输出,主机驱动数据线 */DHT11_DQ_OUT(1);       /* 先拉高,确保起始状态 */DHT11_DQ_OUT(0);       /* 拉低,启动信号 */delay_ms(20);          /* 维持至少 18ms,保证 DHT11 识别 */DHT11_DQ_OUT(1);       /* 拉高,准备接收响应 */dht11_gpio_input();    /* 释放总线,切为输入等待传感器响应 */while(DHT11_DQ_IN);      /* 等待 DHT11 拉低(80us 左右),若卡死需加超时保护 */while(!DHT11_DQ_IN);     /* 等待 DHT11 拉高(80us 左右) */while(DHT11_DQ_IN);      /* 等待 DHT11 再次拉低,进入数据位输出阶段 */
}/* 读取 1 字节(8 位):每位的“高电平持续时间”决定位值:~26-28us 为 0,~70us 为 1 */
uint8_t dht11_read_byte(void)
{uint8_t temp = 0;        /* 暂存每一位 */uint8_t i = 0;           /* 循环计数 */uint8_t read_data = 0;   /* 返回的 8 位数据 */for(i = 0; i < 8; i++){while(!DHT11_DQ_IN);   /* 等待拉高的起始(每位起始低电平 50us) */delay_us(50);          /* 延时 50us 后采样电平(>26us <70us 的分界) */if(DHT11_DQ_IN == 1)   /* 若仍为高电平,判定该位为 1 */{temp = 1;while(DHT11_DQ_IN);/* 等待该位结束拉低,进入下一位 */}else{temp = 0;          /* 采样到低电平,判定该位为 0 */}read_data = read_data << 1; /* 左移一位,为新位腾位置 */read_data |= temp;          /* 叠加本位 */}return read_data; /* 返回 8 位数据 */
}/* 读取 5 字节数据,校验通过后将前 4 字节拷贝到 resultresult[0..3] = {H_int, H_dec, T_int, T_dec} */
void dht11_read(uint8_t *result)
{uint8_t i = 0;dht11_start();        /* 发送起始并等待响应 */dht11_gpio_input();   /* 确保为输入状态,开始逐字节读取 */for(i = 0; i < 5; i++)dht11_data[i] = dht11_read_byte();  /* 连续读取 5 字节 *//* 简单校验:前四字节求和 == 第五字节 */if(dht11_data[0] + dht11_data[1] + dht11_data[2] + dht11_data[3] == dht11_data[4]){memcpy(result, dht11_data, 4);  /* 拷贝有效的 4 字节返回 */printf("湿度:%d.%dRH ,", dht11_data[0], dht11_data[1]);     /* 打印湿度 */printf("温度:%d.%d℃\r\n", dht11_data[2], dht11_data[3]);   /* 打印温度 */}delay_ms(2000);  /* 采样间隔(DHT11 建议 ≥1s),此处 2s */
}

esp8266.h

#ifndef __ESP8266_H__
#define __ESP8266_H__#include "sys.h"   // HAL/UART 等基础定义/* 串口缓冲区大小(按AT回应长度酌情调大)注意:接收缓冲过小会截断返回,影响匹配 */
#define ESP8266_RX_BUF_SIZE         128
#define ESP8266_TX_BUF_SIZE         64/* 返回码(EOK=成功,ERROR=失败等) */
#define ESP8266_EOK                 0
#define ESP8266_ERROR               1
#define ESP8266_ETIMEOUT            2
#define ESP8266_EINVAL              3/* WiFi 工作模式 */
#define ESP8266_STA_MODE            1
#define ESP8266_AP_MODE             2
#define ESP8266_STA_AP_MODE         3/* 连接模式(单/多连接) */
#define ESP8266_SINGLE_CONNECTION   0
#define ESP8266_MULTI_CONNECTION    1/* WiFi AP 参数(按需修改) */
#define WIFI_SSID                   "HuaweiAP-1ED0_Guest"
#define WIFI_PWD                    "gcc11111111"/* MQTT 服务器(域名+端口)注意:你当前是明文 TCP(AT+CIPSTART "TCP"...),主机名叫 mqtts.heclouds.com 通常用于 TLS(SSL) 8883 端口;若 1883 不允许明文,将连接失败。 */
#define TCP_SERVER_IP               "mqtts.heclouds.com"
#define TCP_SERVER_PORT             "1883"/* 对外API */
void esp8266_init(uint32_t baudrate);                 // 初始化UART与联网流程
void esp8266_receive_data(void);                      // 调试打印接收串
void esp8266_send_data(char *data, uint16_t len);     // 发送任意数据
uint16_t esp8266_copy_rxdata(char *data);             // 复制当前接收缓冲到外部
uint8_t esp8266_wait_receive(void);                   // 轮询式“接收完成”判断#endif

esp8266.c

#include "esp8266.h"     // ESP8266 头文件
#include "stdio.h"       // printf
#include "string.h"      // memset/memcpy/strstr
#include "delay.h"       // 延时
#include "stdarg.h"      // 可变参(本文件未使用,可移除)/* 环形/线性接收与发送缓冲 */
uint8_t esp8266_rx_buf[ESP8266_RX_BUF_SIZE];
uint8_t esp8266_tx_buf[ESP8266_TX_BUF_SIZE];
uint16_t esp8266_cnt = 0, esp8266_cntPre = 0;  // 当前接收计数与上一次判定计数/* USART2 句柄(连接到 ESP8266) */
UART_HandleTypeDef esp8266_handle = {0};/* 初始化 USART2:波特率来自形参 */
void esp8266_uart_init(uint32_t baudrate)
{esp8266_handle.Instance = USART2;                  /* 使用 USART2 */esp8266_handle.Init.BaudRate = baudrate;           /* 配置波特率 */esp8266_handle.Init.WordLength = UART_WORDLENGTH_8B;/* 8位数据位 */esp8266_handle.Init.StopBits = UART_STOPBITS_1;    /* 1位停止位 */esp8266_handle.Init.Parity = UART_PARITY_NONE;     /* 无校验 */esp8266_handle.Init.HwFlowCtl = UART_HWCONTROL_NONE;/* 无硬件流控 */esp8266_handle.Init.Mode = UART_MODE_TX_RX;        /* 收发模式 */HAL_UART_Init(&esp8266_handle);                    /* 初始化串口 */
}/* USART2 中断服务:读取 1 字节并放入接收缓冲注意:在中断里调用 HAL_UART_Receive 是阻塞+较重的做法,建议直接读 DR 寄存器或用 HAL_UART_Receive_IT/DMA */
void USART2_IRQHandler(void)
{uint8_t receive_data = 0;                                        /* 暂存接收到的字节 */if(__HAL_UART_GET_FLAG(&esp8266_handle, UART_FLAG_RXNE) != RESET)/* 判断接收缓冲非空标志 */{if(esp8266_cnt >= sizeof(esp8266_rx_buf))                    /* 防止越界:满则回绕 */esp8266_cnt = 0;HAL_UART_Receive(&esp8266_handle, &receive_data, 1, 1000);   /* 读取1字节(阻塞) */esp8266_rx_buf[esp8266_cnt++] = receive_data;                /* 存入接收缓冲 *//* 若要透传到调试串口,可在此处转发到 UART1 */}
}/* 轮询式判断“接收结束”:思路:连续两次读取计数相同,认为 1 帧接收完成(简单超时法) */
uint8_t esp8266_wait_receive(void)
{if(esp8266_cnt == 0)                     /* 还未收到任何字节 */return ESP8266_ERROR;if(esp8266_cnt == esp8266_cntPre)        /* 计数未变化 → 认为一帧已收完 */{esp8266_cnt = 0;                     /* 清零计数,为下一帧做准备 */return ESP8266_EOK;                  /* 返回成功 */}esp8266_cntPre = esp8266_cnt;            /* 记录本次计数,等待下一轮比较 */return ESP8266_ERROR;                    /* 暂未完成 */
}/* 清空接收缓冲与计数 */
void esp8266_rx_clear(void)
{memset(esp8266_rx_buf, 0, sizeof(esp8266_rx_buf));/* 清空内容 */esp8266_cnt = 0;                                   /* 清空计数 */
}/* 调试:若侦测到一帧结束则打印内容并清空 */
void esp8266_receive_data(void)
{if(esp8266_wait_receive() == ESP8266_EOK)  /* 判断接收完成 */{printf("esp8266 recv: %s\r\n", esp8266_rx_buf);/* 打印 AT 回应 */esp8266_rx_clear();                              /* 清空缓冲 */}
}/* 发送任意数据(不附加 \r\n) */
void esp8266_send_data(char *data, uint16_t len)
{esp8266_rx_clear();                                           /* 发送前清空接收缓冲(避免混淆上一次回应) */HAL_UART_Transmit(&esp8266_handle, (unsigned char *)data,     /* 串口发送 */len, 100);
}/* 将当前接收缓冲复制到外部缓冲,返回长度(上一帧长度) */
uint16_t esp8266_copy_rxdata(char *data)
{memcpy(data, esp8266_rx_buf, esp8266_cntPre);  /* 复制最近一次“稳定”的数据 */return esp8266_cntPre;                         /* 返回长度 */
}/* 发送 AT 命令并等待包含关键字 res 的回应,使用“超时+帧静止”策略 */
uint8_t esp8266_send_command(char *cmd, char *res)
{uint8_t time_out = 250;                                  /* 轮询次数(×10ms)约 2.5s */esp8266_rx_clear();                                      /* 清空缓冲 */HAL_UART_Transmit(&esp8266_handle, (uint8_t *)cmd,       /* 发送命令 */strlen(cmd), 100);while(time_out--){if(esp8266_wait_receive() == ESP8266_EOK)            /* 一帧稳定 */{if(strstr((const char*)esp8266_rx_buf, res) != NULL) /* 匹配期望子串 */return ESP8266_EOK;                          /* 成功 */}delay_ms(10);                                        /* 间隔 10ms 轮询 */}return ESP8266_ERROR;                                    /* 超时失败 */
}/* AT 测试:期待 "OK" */
uint8_t esp8266_at_test(void)
{return esp8266_send_command("AT\r\n", "OK");
}/* 设置工作模式:1=STA 2=AP 3=STA+AP */
uint8_t esp8266_set_mode(uint8_t mode)
{switch(mode){case ESP8266_STA_MODE:return esp8266_send_command("AT+CWMODE=1\r\n", "OK");case ESP8266_AP_MODE:return esp8266_send_command("AT+CWMODE=2\r\n", "OK");case ESP8266_STA_AP_MODE:return esp8266_send_command("AT+CWMODE=3\r\n", "OK");default:return ESP8266_EINVAL;  /* 非法参数 */}
}/* 连接 AP:AT+CWJAP="ssid","pwd"返回关键字等待 "WIFI GOT IP" */
uint8_t esp8266_join_ap(char *ssid, char *pwd)
{char cmd[64];sprintf(cmd, "AT+CWJAP=\"%s\",\"%s\"\r\n", ssid, pwd); /* 拼接指令 */return esp8266_send_command(cmd, "WIFI GOT IP");       /* 连接并等待拿到 IP */
}/* 设置连接模式(单/多) */
uint8_t esp8266_connection_mode(uint8_t mode)
{char cmd[64];sprintf(cmd, "AT+CIPMUX=%d\r\n", mode);        /* 0 单路,1 多路 */return esp8266_send_command(cmd, "OK");
}/* 连接 TCP 服务器:AT+CIPSTART="TCP","host",port注意:若要 TLS,应使用 "SSL" 并确保模组固件支持与证书配置 */
uint8_t esp8266_connect_tcp_server(char *server_ip, char *server_port)
{char cmd[64];sprintf(cmd, "AT+CIPSTART=\"TCP\",\"%s\",%s\r\n", server_ip, server_port); /* 明文 TCP */return esp8266_send_command(cmd, "CONNECT");                                /* 等待 CONNECT */
}/* 进入透传:CIPMODE=1 + CIPSEND之后串口数据会直接发往 TCP(直连 MQTT 报文) */
uint8_t esp8266_enter_unvarnished(void)
{uint8_t ret;ret = esp8266_send_command("AT+CIPMODE=1\r\n", "OK");  /* 开启透传模式 */ret += esp8266_send_command("AT+CIPSEND\r\n", ">");    /* 进入发送态,等待 '>' 提示 */if (ret == ESP8266_EOK)return ESP8266_EOK;                                /* 成功 */elsereturn ESP8266_ERROR;                              /* 失败 */
}/* 综合初始化流程:1) 配置串口2) AT 测试3) 设置 STA4) 单链接5) 连接 WiFi6) 连接 TCP 服务器7) 进入透传 */
void esp8266_init(uint32_t baudrate)
{printf("esp8266初始化开始...\r\n");esp8266_uart_init(baudrate);printf("1. 测试esp8266是否存在...\r\n");while(esp8266_at_test())                    /* 返回非0表示失败,循环重试 */delay_ms(500);printf("2. 设置工作模式为STA...\r\n");while(esp8266_set_mode(ESP8266_STA_MODE))   /* 设置为 STA */delay_ms(500);printf("3. 设置单路链接模式...\r\n");while(esp8266_connection_mode(ESP8266_SINGLE_CONNECTION)) /* 单连接 */delay_ms(500);printf("4. 连接wifi,SSID: %s, PWD: %s\r\n", WIFI_SSID, WIFI_PWD);while(esp8266_join_ap(WIFI_SSID, WIFI_PWD)) /* 加入 AP 并等待拿到 IP */delay_ms(1500);printf("5. 连接TCP服务器,server_ip:%s, server_port:%s\r\n", TCP_SERVER_IP, TCP_SERVER_PORT);while(esp8266_connect_tcp_server(TCP_SERVER_IP, TCP_SERVER_PORT)) /* 与 MQTT 服务器建 TCP 连接 */delay_ms(500);printf("6. 进入到透传模式...\r\n");while(esp8266_enter_unvarnished())          /* 进入透传 */delay_ms(500);printf("ESP8266已连接上TCP服务器并进入透传模式\r\n");printf("ESP8266初始化完成!\r\n");
}

onenet.h

#ifndef _ONENET_H_
#define _ONENET_H_#include "string.h"  // strlen/memcpy/sprintf
#include "stdio.h"   // printf
#include "stdlib.h"  // 标准库
#include "stdarg.h"  // 可变参(本头中未用,可移除)
#include "delay.h"   // delay_ms/* 小端平台上通过字节指针访问 32位整型的各字节(注意对齐/别越界)用于写入 MQTT 报文的 16位长度(BYTE1 高字节,BYTE0 低字节) */
#define BYTE0(dwTemp)       (*( char *)(&dwTemp))
#define BYTE1(dwTemp)       (*((char *)(&dwTemp) + 1))
#define BYTE2(dwTemp)       (*((char *)(&dwTemp) + 2))
#define BYTE3(dwTemp)       (*((char *)(&dwTemp) + 3))/* 三个 MQTT 登录字段,由 mqtt_login_init() 生成 */
extern char MQTT_ClientID[100]; // ClientID
extern char MQTT_UserName[100]; // Username
extern char MQTT_PassWord[200]; // Password/* MQTT 收包解析后的结构体(最多 512B 主题/载荷) */
typedef struct
{uint8_t topic[512];     /* 主题缓冲 */uint16_t topic_len;     /* 主题长度 */uint8_t payload[512];   /* 载荷缓冲 */uint16_t payload_len;   /* 载荷长度 */
} Mqtt_RxData_Type;/* OneNET 设备证书(示例用,按你的实际项目配置) */
#define PRODUCT_KEY "pC0uTV161W"
#define DEVICE_NAME "dht11_01"
#define DEVICE_SECRET "75AKO7FD5KBEuSJ6BTDLPFC227w%3D"  /* 这里看起来像 URL 编码后的密钥 *//* 主题(OneNET Studio 物模型标准主题) */
#define RELY_PUBLISH_TOPIC  "$sys/pC0uTV161W/dht11_01/thing/property/set_reply"  /* 属性设置应答发布主题 */
#define SET_TOPIC           "$sys/pC0uTV161W/dht11_01/thing/property/set"        /* 属性设置下行主题(订阅) */
#define POST_TOPIC          "$sys/pC0uTV161W/dht11_01/thing/property/post"       /* 属性上报发布主题 *//* 事件上报主题(示例) */
#define EVENT_PUBLISH_TOPIC "$sys/pC0uTV161W/dht11_01/thing/event/post"/* 登录字段生成(此处混合了“阿里云风格注释”和 OneNET 令牌参数,请以你平台为准) */
void mqtt_login_init(char *ProductKey,char *DeviceName,char *DeviceSecret);/* MQTT 相关 API(打包与发送、订阅、连接等) */
uint8_t mqtt_publish_data(char *topic, char *message, uint8_t qos);
uint8_t mqtt_subscribe_topic(char *topic,uint8_t qos,uint8_t whether);
void mqtt_init(void);
uint8_t mqtt_connect(char *ClientID,char *Username,char *Password);
void mqtt_send_heart(void);
void mqtt_disconnect(void);
void mqtt_send_data(uint8_t *buf,uint16_t len);
void mqtt_send_response(uint8_t *id);
uint8_t mqtt_receive_handle(uint8_t *data_received, Mqtt_RxData_Type *rx_data);#endif

onenet.c

#include "onenet.h"     // 对应头文件
#include "esp8266.h"    // 发送底层通过 esp8266_send_data/* 三个登录字段缓冲(CONNECT 载荷使用) */
char MQTT_ClientID[100]; // MQTT ClientID
char MQTT_UserName[100]; // MQTT Username
char MQTT_PassWord[200]; // MQTT Password/* MQTT 发送/接收缓冲(打包/解析) */
uint8_t *mqtt_rxbuf;       /* 指向接收缓存区 */
uint8_t *mqtt_txbuf;       /* 指向发送缓存区 */
uint16_t mqtt_rxlen;       /* 接收区长度 */
uint16_t mqtt_txlen;       /* 发送区长度 */
uint8_t _mqtt_txbuf[512];  /* 实际发送缓冲 */
uint8_t _mqtt_rxbuf[512];  /* 实际接收缓冲 *//* MQTT 报文类型枚举(与固定报头类型值一致) */
typedef enum
{M_RESERVED1 = 0,    /* 保留 */M_CONNECT,          /* 1  CONNECT */M_CONNACK,          /* 2  CONNACK */M_PUBLISH,          /* 3  PUBLISH */M_PUBACK,           /* 4  PUBACK  (QoS1) */M_PUBREC,           /* 5  PUBREC  (QoS2 步骤1) */M_PUBREL,           /* 6  PUBREL  (QoS2 步骤2) */M_PUBCOMP,          /* 7  PUBCOMP (QoS2 步骤3) */M_SUBSCRIBE,        /* 8  SUBSCRIBE */M_SUBACK,           /* 9  SUBACK */M_UNSUBSCRIBE,      /* 10 UNSUBSCRIBE */M_UNSUBACK,         /* 11 UNSUBACK */M_PINGREQ,          /* 12 PINGREQ */M_PINGRESP,         /* 13 PINGRESP */M_DISCONNECT,       /* 14 DISCONNECT */M_RESERVED2,        /* 15 保留 */
}_typdef_mqtt_message;/* 常见固定报文(HEX 模版)连接成功回应通常是 20 02 00 00 (CONNACK) */
const uint8_t parket_connetAck[] = {0x20,0x02,0x00,0x00};  /* CONNACK: session present=0, return code=0 */
const uint8_t parket_disconnet[] = {0xe0,0x00};            /* DISCONNECT */
const uint8_t parket_heart[]    = {0xc0,0x00};            /* PINGREQ */
const uint8_t parket_heart_reply[] = {0xc0,0x00};         /* ★注意:PINGRESP 应为 D0 00,这里写成 C0 00(PINGREQ),未使用可忽略 */
const uint8_t parket_subAck[]   = {0x90,0x03};            /* SUBACK 固定开头(后续含 packet id+返回码) *//* 生成 MQTT 登录字段(ClientID/Username/Password)这里采用 OneNET 的 token 样式(version/res/et/method/sign),具体以你平台文档为准 */
void mqtt_login_init(char *ProductKey,char *DeviceName,char *DeviceSecret)
{/* 直接将设备名作为 ClientID(需唯一) */sprintf(MQTT_ClientID,"%s", DeviceName);/* 将产品ID作为 Username(部分平台要求 Username=产品ID 或 设备名&产品ID) */sprintf(MQTT_UserName,"%s", ProductKey);/* 生成 Password:这里是 OneNET 的 token 形式(举例),sign=DEVICE_SECRET(通常应是计算后的签名)。注意:你的 DEVICE_SECRET 宏看起来像 URL 编码后的字符串。 */sprintf(MQTT_PassWord,"version=2018-10-31&res=products%%2F%s%%2Fdevices%%2F%s&et=2017881776&method=sha1&sign=%s",ProductKey, DeviceName, DEVICE_SECRET);
}/* 初始化 MQTT 缓冲与登录参数,并尝试发送 DISCONNECT 清理残留会话 */
void mqtt_init(void)
{mqtt_login_init(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET);  /* 填充 ClientID/Username/Password *//* 绑定内部缓冲到指针,记录长度,清零内容 */mqtt_rxbuf = _mqtt_rxbuf;mqtt_rxlen = sizeof(_mqtt_rxbuf);mqtt_txbuf = _mqtt_txbuf;mqtt_txlen = sizeof(_mqtt_txbuf);memset(mqtt_rxbuf,0,mqtt_rxlen);memset(mqtt_txbuf,0,mqtt_txlen);/* 先发两次 DISCONNECT,防止上电复位后服务器仍保持旧连接状态 */mqtt_disconnect();delay_ms(100);mqtt_disconnect();delay_ms(100);
}/* 发送 CONNECT 报文并等待 CONNACK返回 0=成功,1=失败 */
uint8_t mqtt_connect(char *ClientID,char *Username,char *Password)
{uint8_t j;                                    /* 轮询计数 */int ClientIDLen = strlen(ClientID);           /* ClientID 长度 */int UsernameLen = strlen(Username);           /* Username 长度 */int PasswordLen = strlen(Password);           /* Password 长度 */int DataLen;                                  /* 剩余长度:可变报头 + 负载 */mqtt_txlen=0;                                 /* 发送缓冲写指针清零 *//* 计算剩余长度:可变报头10字节 + ClientID(2+Len) + Username(2+Len) + Password(2+Len) */DataLen = 10 + (ClientIDLen+2) + (UsernameLen+2) + (PasswordLen+2);/* 固定报头:CONNECT = 0x10 */mqtt_txbuf[mqtt_txlen++] = 0x10;/* 剩余长度编码(VarInt,可能 1~4 字节) */do{uint8_t encodedByte = DataLen % 128;      /* 取低 7 位 */DataLen = DataLen / 128;                  /* 除以 128 */if ( DataLen > 0 )                        /* 若后续还有字节,将最高位设 1 */encodedByte = encodedByte | 128;mqtt_txbuf[mqtt_txlen++] = encodedByte;   /* 写入一个字节 */}while ( DataLen > 0 );/* 可变报头:协议名 "MQTT" */mqtt_txbuf[mqtt_txlen++] = 0;                 /* 协议名长度 MSB */mqtt_txbuf[mqtt_txlen++] = 4;                 /* 协议名长度 LSB */mqtt_txbuf[mqtt_txlen++] = 'M';mqtt_txbuf[mqtt_txlen++] = 'Q';mqtt_txbuf[mqtt_txlen++] = 'T';mqtt_txbuf[mqtt_txlen++] = 'T';/* 协议级别:3.1.1 = 4 */mqtt_txbuf[mqtt_txlen++] = 4;/* 连接标志(Connect Flags):0xC2 = 1100 0010bbit7 username flag=1、bit6 password flag=1、bit1 clean session=1will flag/retain/qos 均为 0(无遗嘱) */mqtt_txbuf[mqtt_txlen++] = 0xc2;/* KeepAlive:100 秒(0x0064) */mqtt_txbuf[mqtt_txlen++] = 0;      /* KeepAlive MSB */mqtt_txbuf[mqtt_txlen++] = 100;    /* KeepAlive LSB *//* 负载:ClientID(2字节长度+内容) */mqtt_txbuf[mqtt_txlen++] = BYTE1(ClientIDLen);  /* 长度高字节 */mqtt_txbuf[mqtt_txlen++] = BYTE0(ClientIDLen);  /* 长度低字节 */memcpy(&mqtt_txbuf[mqtt_txlen], ClientID, ClientIDLen); /* 拷贝字符串 */mqtt_txlen += ClientIDLen;/* 负载:Username(可选) */if(UsernameLen > 0){mqtt_txbuf[mqtt_txlen++] = BYTE1(UsernameLen);mqtt_txbuf[mqtt_txlen++] = BYTE0(UsernameLen);memcpy(&mqtt_txbuf[mqtt_txlen], Username, UsernameLen);mqtt_txlen += UsernameLen;}/* 负载:Password(可选) */if(PasswordLen > 0){mqtt_txbuf[mqtt_txlen++] = BYTE1(PasswordLen);mqtt_txbuf[mqtt_txlen++] = BYTE0(PasswordLen);memcpy(&mqtt_txbuf[mqtt_txlen], Password, PasswordLen);mqtt_txlen += PasswordLen;}/* 发送 CONNECT */memset(mqtt_rxbuf,0,mqtt_rxlen);               /* 清空接收缓冲 */mqtt_send_data(mqtt_txbuf,mqtt_txlen);         /* 透传发送整个报文 *//* 简单轮询等待 CONNACK(最多 10*50ms) */for(j=0;j<10;j++){delay_ms(50);if (esp8266_wait_receive() == ESP8266_EOK)         /* 一帧稳定 */esp8266_copy_rxdata((char *)mqtt_rxbuf);       /* 复制到 mqtt_rxbuf *//* 匹配开头 20 02 00,即 CONNACK + return code=0 */if(mqtt_rxbuf[0]==parket_connetAck[0] &&mqtt_rxbuf[1]==parket_connetAck[1] &&mqtt_rxbuf[2]==parket_connetAck[2]){return 0; /* 连接成功 */}}return 1; /* 超时/失败 */
}/* 订阅/取消订阅打包并发送topic=主题字符串qos  =请求的 QoS(仅订阅报文需要附带)whether=1 订阅(SUBSCRIBE),0 取消订阅(UNSUBSCRIBE)返回 0 成功,1 失败 */
uint8_t mqtt_subscribe_topic(char *topic,uint8_t qos,uint8_t whether)
{uint8_t j;                     /* 轮询计数 */mqtt_txlen=0;                  /* 清零发送写指针 */int topiclen = strlen(topic);  /* 主题长度 *//* 剩余长度:可变报头2字节(packet id) + 负载(2+topiclen [+1个qos]) */int DataLen = 2 + (topiclen+2) + (whether?1:0);/* 固定报头第一字节:SUBSCRIBE 必须 0x82;UNSUBSCRIBE 必须 0xA2(flags=0010) */if(whether) mqtt_txbuf[mqtt_txlen++] = 0x82; /* SUBSCRIBE */else        mqtt_txbuf[mqtt_txlen++] = 0xA2; /* UNSUBSCRIBE *//* 剩余长度 VarInt 编码 */do{uint8_t encodedByte = DataLen % 128;DataLen = DataLen / 128;if ( DataLen > 0 )encodedByte = encodedByte | 128;mqtt_txbuf[mqtt_txlen++] = encodedByte;}while ( DataLen > 0 );/* 可变报头:Packet Identifier(这里固定用 0x0001) */mqtt_txbuf[mqtt_txlen++] = 0;      /* MSB */mqtt_txbuf[mqtt_txlen++] = 0x01;   /* LSB *//* 负载:主题(2字节长度 + 主题字符串) */mqtt_txbuf[mqtt_txlen++] = BYTE1(topiclen);mqtt_txbuf[mqtt_txlen++] = BYTE0(topiclen);memcpy(&mqtt_txbuf[mqtt_txlen], topic, topiclen);mqtt_txlen += topiclen;/* SUBSCRIBE 还需在每个过滤器后附带“请求 QoS”字节;UNSUBSCRIBE 不需要 */if(whether){mqtt_txbuf[mqtt_txlen++] = qos;}/* 发送并等待 SUBACK(或者 UNSUBACK) */memset(mqtt_rxbuf,0,mqtt_rxlen);mqtt_send_data(mqtt_txbuf,mqtt_txlen);for(j=0;j<10;j++){delay_ms(50);if (esp8266_wait_receive() == ESP8266_EOK)esp8266_copy_rxdata((char *)mqtt_rxbuf);/* 粗略判断:SUBACK 开头 0x90 0x03。更严谨应校验 packet id 和返回码 */if(mqtt_rxbuf[0]==parket_subAck[0] && mqtt_rxbuf[1]==parket_subAck[1]){return 0; /* 成功 */}}return 1; /* 失败 */
}/* 发布 PUBLISH 报文topic=主题;message=载荷字符串;qos=0/1(当前仅处理 0/1)返回 发送字节数(不等于是否成功) */
uint8_t mqtt_publish_data(char *topic, char *message, uint8_t qos)
{int topicLength = strlen(topic);               /* 主题长度 */int messageLength = strlen(message);           /* 载荷长度 */static uint16_t id=0;                          /* Packet Identifier 递增(QoS1 使用) */int DataLen;                                   /* 剩余长度 */mqtt_txlen=0;                                  /* 写指针清零 *//* 剩余长度 = 主题 (2+len) + [PacketId(2,若QoS>0)] + 载荷 */if(qos)  DataLen = (2+topicLength) + 2 + messageLength;else     DataLen = (2+topicLength) +     messageLength;/* 固定报头首字节:0x30 = PUBLISH, dup=0, qos=0, retain=0★注意:若 qos=1,应将首字节置为 0x32(qos 位=01),这里未随 qos 修改,仅做示例 */mqtt_txbuf[mqtt_txlen++] = 0x30;   /* 建议:qos==1 时改为 0x32 *//* 剩余长度 VarInt 编码 */do{uint8_t encodedByte = DataLen % 128;DataLen = DataLen / 128;if ( DataLen > 0 )encodedByte = encodedByte | 128;mqtt_txbuf[mqtt_txlen++] = encodedByte;}while ( DataLen > 0 );/* 主题:2 字节长度 + 内容 */mqtt_txbuf[mqtt_txlen++] = BYTE1(topicLength);mqtt_txbuf[mqtt_txlen++] = BYTE0(topicLength);memcpy(&mqtt_txbuf[mqtt_txlen], topic, topicLength);mqtt_txlen += topicLength;/* 若 QoS>0,需要附带 Packet Identifier(大端) */if(qos){mqtt_txbuf[mqtt_txlen++] = BYTE1(id);mqtt_txbuf[mqtt_txlen++] = BYTE0(id);id++;  /* 下次递增(注意溢出回绕) */}/* 拷贝载荷 */memcpy(&mqtt_txbuf[mqtt_txlen], message, messageLength);mqtt_txlen += messageLength;/* 透传发送到 TCP */mqtt_send_data(mqtt_txbuf,mqtt_txlen);return mqtt_txlen;  /* 返回发送长度(用于调试) */
}/* 解析接收到的 PUBLISH 报文(简化版,仅处理 0x30/0x32/0x34 三种首字节)data_received = 原始字节流rx_data       = 输出结构体(主题/载荷/长度)返回 0 成功;非 0=失败 */
uint8_t mqtt_receive_handle(uint8_t *data_received, Mqtt_RxData_Type *rx_data)
{uint8_t *p;                       /* 读指针 */uint8_t encodeByte = 0;           /* VarInt 临时字节 */uint32_t multiplier = 1;          /* VarInt 进位乘数 */uint32_t Remaining_len = 0;       /* 剩余长度 */uint8_t QS_level = 0;             /* 标记 QoS 是否>0 */p = data_received;                                /* 指向起始 */memset(rx_data, 0, sizeof(Mqtt_RxData_Type));     /* 清空输出结构体 *//* 仅处理 PUBLISH(0x30/0x32/0x34) */if((*p != 0x30)&&(*p != 0x32)&&(*p != 0x34))return 1; /* 非 PUBLISH */if(*p != 0x30) QS_level = 1;       /* 0x32/0x34 表示 QoS1/2,标记需要跳过 PacketId */p++;                                /* 指向剩余长度字段 *//* 解析 VarInt:每字节低7位有效,高位1表示还有后续字节 */do{encodeByte = *p++;Remaining_len += (encodeByte & 0x7F) * multiplier;multiplier *= 128;if(multiplier > 128*128*128)    /* 最多4字节,超过视为错误 */return 2;}while((encodeByte & 0x80) != 0);/* 主题长度(2字节大端) */rx_data->topic_len = *p++;rx_data->topic_len = rx_data->topic_len * 256 + *p++;/* 拷贝主题字符串(非 \0 结尾,使用长度字段) */memcpy(rx_data->topic, p, rx_data->topic_len);p += rx_data->topic_len;/* 若 QoS>0,紧随其后是 2字节 PacketId,需跳过 */if(QS_level != 0)p += 2;/* 载荷长度 = 剩余长度 - 主题(2+len) - [packetId(2,若QoS>0)] */rx_data->payload_len = Remaining_len - rx_data->topic_len - 2;memcpy(rx_data->payload, p, rx_data->payload_len);   /* 拷贝载荷 */return 0; /* 解析成功 */
}/* 发送“属性设置应答”(示例)id=对端请求的 id(字符串),回包 {"id":"xxx","code":200,"msg":"success"} */
void mqtt_send_response(uint8_t *id)
{char buf[128] = {0};                                         /* 本地缓冲 */sprintf(buf,"{\"id\":\"%s\",\"code\":200,\"msg\":\"success\"}",id); /* 组包 JSON */mqtt_publish_data(RELY_PUBLISH_TOPIC,(char *)buf,0);         /* 发布到应答主题,QoS=0 */printf("\r\n发布数据:\r\n");printf((const char *)buf);                                   /* 打印调试 */printf("\r\n");
}/* 发送心跳(PINGREQ) */
void mqtt_send_heart(void)
{mqtt_send_data((uint8_t *)parket_heart, sizeof(parket_heart));
}/* 发送 DISCONNECT */
void mqtt_disconnect(void)
{mqtt_send_data((uint8_t *)parket_disconnet, sizeof(parket_disconnet));
}/* 底层发送函数封装:走 ESP8266 透传 */
void mqtt_send_data(uint8_t *buf,uint16_t len)
{esp8266_send_data((char *)buf, len);  /* 直接下发到 TCP */
}

http://www.lryc.cn/news/613812.html

相关文章:

  • 引领云原生时代,华为云助您构建敏捷未来
  • ChatGPT模型选择器详解:全面了解GPT-4o、GPT-4.5、o3等模型的切换与使用策略(2025最新版)
  • Flink的时间语义
  • 数学建模——遗传算法
  • QT中的trimmed() 方法(1)
  • 从大数据视角理解时序数据库选型:为何选择 Apache IoTDB?
  • RabbitMQ 如何实现高可用
  • Spring AMQP 入门与实践:整合 RabbitMQ 构建可靠消息系统
  • 2025华数杯数学建模A题【 多孔膜光反射性能的优化与控制】原创论文讲解(含完整python代码)
  • ClickHouse、Doris、OpenSearch、Splunk、Solr系统化分析
  • ​「解决方案」Linux 无法在 NTFS 硬盘上创建文件/文件夹的问题
  • 【FreeRTOS】任务间通讯3:互斥量- Mutex
  • linux添加自启动
  • wodpress结构化数据对SEO的作用
  • simulink tlc如何通过tlc写数据入文件
  • 基于UDP的代理协议的Tuic怎么样?
  • GoLand 项目从 0 到 1:第六天 —— 权限接口开发与问题攻坚
  • 构建响应式在线客服聊天系统的前端实践 Vue3+ElementUI + CSS3
  • 走进Linux世界:make和makefile
  • Seaborn 学习笔记
  • LINUX-用户及用户组管理
  • 【嵌入式】记录一次网络转串口透传性能提升的过程
  • 【Linux系统】万字解析,文件IO
  • Android 系统的安全 和 三星安全的区别
  • 华为USG防火墙双机,但ISP只给了1个IP, 怎么办?
  • 5. 缓存-Redis
  • 【Android笔记】Android 自定义 TextView 实现垂直渐变字体颜色(支持 XML 配置)
  • 考研复习-计算机组成原理-第四章-指令系统
  • wstool和git submodule优劣势对比
  • WinForm 对话框的 Show 与 ShowDialog:阻塞与非阻塞的抉择