当前位置: 首页 > news >正文

十七、Rust集成MQTT Client

1、信息整理

        目前了解到的Rust MQTT项目有:

  • bytebeamio/rumqtt

    • 1.3k star、717 commits、Contributors 78、
    • tokio、futures、tls、
    • rumqttc(client):cargo add rumqttc
      • https://github.com/bytebeamio/rumqtt/tree/main/rumqttc
    • rumqttd(server):docker run -p 1883:1883 -p 1884:1884 -it bytebeamio/rumqttd
  • ntex-rs/ntex-mqtt、crates/ntex-mqtt

    • 258 star、504 commits、Contributors 15
    • ntex、MQTT Client/Server、v5 and v3.1.1 protocols
  • eclipse/paho.mqtt.rust

    • 463 star、368 commits、
    • 异步、SSL/TLS、cargo add paho-mqtt = "0.12"
    • 但看自述文档所述,用于 musl linux 环境,有代码侵入。
    fn is_musl() -> bool {std::env::var("CARGO_CFG_TARGET_ENV").unwrap() == "musl"
    }
    
  • rmqtt/rmqtt:355 star,871 commits

    • 基于 ( tokio、ntex、ntex-mqtt )
    • 一个纯服务端,遍观项目,貌似不能做 client 使用。自身的测试,使用 paho.mqtt.testing 项目完成。
    • 分布式集群(raft),TLS,WebSocket,WebSocket-TLS,x86/Arm
    • 单个服务节点上处理百万级别的并发客户端。集群提供相同的连接量、吞吐量,应该只是解决高可用问题。

2、开始集成

       本次集成作为 Client 使用,选择了最前面的 bytebeamio/rumqtt 。

  • boot/mqtt.rs
use std::sync::Arc;
use std::time::Duration;use once_cell::sync::OnceCell;
use rumqttc::{AsyncClient, MqttOptions};
use serde::{Deserialize, Serialize};pub static MQTT_CLIENT: OnceCell<Arc<AsyncClient>> = OnceCell::new();#[derive(Debug, Default, Validate, Serialize, Deserialize)]
pub struct MqttMessage {pub addr: Option<String>,#[validate(range(min = 0, max = 1))]pub action: u8,
}pub async fn start() {let mut mqtt_options = MqttOptions::new("rumqtt-async", "192.168.1.110", 1883);mqtt_options.set_keep_alive(Duration::from_secs(5));let (client, mut event_loop) = AsyncClient::new(mqtt_options, 100);MQTT_CLIENT.get_or_init(|| { Arc::new(client) });tokio::spawn(async move {loop { while let Ok(notification) = event_loop.poll().await {}; }});log::info!("this point run ...");
}
  • boot/mod.rs
pub mod mqtt;pub async fn start() {// xxx others initializationmqtt::start().await;
}
  • modules/mod.rs
pub mod switch;pub mod handler {use actix_web::dev::HttpServiceFactory;use actix_web::web;use crate::module::{switch};pub fn api_routes() -> impl HttpServiceFactory {web::scope("").service(switch::api::index)}
}
  • modules/switch/api.rs
use rumqttc::QoS::AtLeastOnce;
use validator::Validate;use crate::boot::mqtt::{MQTT_CLIENT, MqttMessage};#[post("/switch/{id}")]
pub async fn index(Path(id): Path<String>, mut msg: Json<MqttMessage>) -> impl Responder {let ok = vec!["Hello World!", "Hello World!"];if let Err(e) = msg.validate() { return HttpResponse::BadRequest().json(e); }msg.addr = Some(id);let json = serde_json::to_string(&msg).unwrap();MQTT_CLIENT.get().unwrap().publish("/hello", AtLeastOnce, false, json).await.unwrap();HttpResponse::Ok().json(ok)
}
  • main.rs
use actix_web::{App, HttpServer, middleware};
use booking::{boot, module};#[actix_web::main]
async fn main() -> std::io::Result<()> {boot::start().await;HttpServer::new(move || App::new()// TODO other initialization.service(module::handler::api_routes())// other routes).bind("0.0.0.0:8080")?.run().await
}

       代码贴完,剩下真是没啥可说的,拜了个 bye ~

http://www.lryc.cn/news/238973.html

相关文章:

  • HarmonyOS ArkTS开发语言介绍(三)
  • [架构之路-247]:目标系统 - 设计方法 - 软件工程 - 结构化方法的基本思想、本质、特点以及在软件开发、在生活中的应用
  • 大模型的交互能力
  • 80%测试员被骗,关于jmeter 的一个弥天大谎!
  • Git——感谢尚硅谷官方文档
  • Java WebSocket框架
  • C#实现本地服务器客户端私聊通信
  • PyTorch 之 Dataset 类入门学习
  • Java update scheduler
  • 常见树种(贵州省):006栎类
  • 拓扑排序-
  • Oracle数据库如何定位trace file位置
  • 电脑盘符错乱,C盘变成D盘怎么办?
  • Android WMS——客户端输入事件处理(十九)
  • Python基础学习__测试报告
  • bclinux aarch64 ceph 14.2.10 云主机 4节点 fio
  • 智能座舱架构与芯片- (14) 测试篇 上
  • 【Django-DRF用法】多年积累md笔记,第3篇:Django-DRF的序列化和反序列化详解
  • Redis主从复制,哨兵和Cluster集群
  • Linux嵌入式I2C协议笔记
  • 科技的成就(五十三)
  • Ubuntu22.04 编译 AOSP
  • 【计算机网络】多路复用的三种方案
  • 供应链和物流的自动化新时代
  • Python与ArcGIS系列(九)自定义python地理处理工具
  • Nginx部署前端项目
  • 根据文件类型进行下载, 文档/图片
  • 赋范线性空间3
  • XSLVGL2.0 User Manual 缩略图生成器(v2.0)
  • 练习八-利用有限状态机进行时序逻辑的设计