当前位置: 首页 > news >正文

InfluxDB 与 MQTT 协议集成实践(二)

集成实现步骤

配置 MQTT 客户端

在 Python 中,我们使用paho - mqtt库来配置 MQTT 客户端。paho - mqtt库是一个 Python 的 MQTT 客户端库,它提供了简单易用的接口,方便我们与 MQTT Broker 进行通信。

首先,确保已经安装了paho - mqtt库。如果没有安装,可以使用以下命令进行安装 :

pip install paho - mqtt

下面是一个简单的 MQTT 客户端配置示例,展示了如何连接到 MQTT Broker、订阅主题以及接收消息 :

import paho.mqtt.client as mqtt

# 连接成功回调函数

def on_connect(client, userdata, flags, rc):

if rc == 0:

print("Connected to MQTT Broker!")

# 订阅主题

client.subscribe("sensor/data")

else:

print(f"Failed to connect, return code {rc}")

# 接收消息回调函数

def on_message(client, userdata, msg):

print(f"Received message on topic '{msg.topic}': {msg.payload.decode()}")

# 创建MQTT客户端实例

client = mqtt.Client()

# 设置连接成功回调函数

client.on_connect = on_connect

# 设置接收消息回调函数

client.on_message = on_message

# 设置MQTT Broker地址和端口

broker_address = "localhost"

port = 1883

# 连接到MQTT Broker

client.connect(broker_address, port, 60)

# 启动循环,保持连接并接收消息

client.loop_start()

try:

while True:

pass

except KeyboardInterrupt:

print("Exiting...")

# 停止循环

client.loop_stop()

# 断开连接

client.disconnect()

在上述代码中:

  • on_connect函数是连接成功后的回调函数,当客户端成功连接到 MQTT Broker 时,会调用这个函数。在这个函数中,首先检查连接结果rc,如果rc为 0,表示连接成功,然后订阅了名为sensor/data的主题。
  • on_message函数是接收消息后的回调函数,当客户端接收到订阅主题的消息时,会调用这个函数。它打印出接收到消息的主题和内容。
  • 创建了一个mqtt.Client()实例,并将on_connect和on_message函数分别设置为连接成功和接收消息的回调函数。
  • 使用client.connect方法连接到指定地址和端口的 MQTT Broker。
  • 通过client.loop_start启动一个循环,用于保持与 MQTT Broker 的连接,并接收来自 Broker 的消息。最后,通过KeyboardInterrupt捕获用户的中断操作(如按下 Ctrl+C),停止循环并断开与 MQTT Broker 的连接。

数据写入 InfluxDB

接下来,我们需要将接收到的 MQTT 消息写入 InfluxDB。这里使用influxdb - client库来与 InfluxDB 进行交互。首先,安装influxdb - client库 :

pip install influxdb - client

假设已经在 MQTT 客户端的on_message回调函数中接收到了消息,下面展示如何将这些消息写入 InfluxDB :

from influxdb_client import InfluxDBClient, Point

from influxdb_client.client.write_api import SYNCHRONOUS

# InfluxDB配置

token = "your_token"

org = "your_org"

bucket = "your_bucket"

url = "http://localhost:8086"

# 创建InfluxDB客户端实例

client = InfluxDBClient(url=url, token=token, org=org)

write_api = client.write_api(write_options=SYNCHRONOUS)

def on_message(client, userdata, msg):

# 解析MQTT消息内容

message = msg.payload.decode()

# 假设消息内容是JSON格式,解析JSON数据

try:

import json

data = json.loads(message)

# 构建InfluxDB的Point对象

point = Point("sensor_measurement")

for key, value in data.items():

point.field(key, value)

# 写入数据到InfluxDB

write_api.write(bucket, org, point)

print(f"Data written to InfluxDB: {data}")

except json.JSONDecodeError:

print("Received message is not in valid JSON format")

在上述代码中:

  • 首先配置了 InfluxDB 的访问令牌token、组织名称org、存储桶名称bucket以及 URL。
  • 创建了一个InfluxDBClient实例,并通过write_api获取写入 API,设置写入选项为同步模式SYNCHRONOUS,这样可以确保数据写入操作是同步进行的,便于调试和保证数据写入的顺序性。
  • 在on_message函数中,首先将接收到的 MQTT 消息负载解码为字符串。然后假设消息内容是 JSON 格式,尝试解析 JSON 数据。如果解析成功,遍历解析后的数据,为每个键值对添加一个字段到Point对象中,Point对象的名称为sensor_measurement,表示测量值(类似于 InfluxDB 中的表名)。最后,使用write_api.write方法将Point对象写入到指定的存储桶和组织中。如果消息不是有效的 JSON 格式,则打印错误信息。

数据格式转换与处理

在将 MQTT 消息写入 InfluxDB 之前,通常需要进行数据格式转换和处理。MQTT 消息的格式多种多样,而 InfluxDB 要求数据以特定的 Line Protocol 格式写入。例如,如果 MQTT 消息是 JSON 格式,我们需要将其转换为 InfluxDB 的 Line Protocol 格式。

以下是一个将 JSON 格式的 MQTT 消息转换为 Line Protocol 格式的示例 :

import json

def json_to_line_protocol(json_data, measurement):

point = f"{measurement}"

tags = ""

fields = ""

timestamp = ""

for key, value in json_data.items():

if isinstance(value, (int, float)):

if fields:

fields += ","

fields += f"{key}={value}"

elif isinstance(value, str):

if tags:

tags += ","

tags += f"{key}=\"{value}\""

elif key == "time":

timestamp = f" {int(value)}" if isinstance(value, int) else f" {int(pd.Timestamp(value).timestamp() * 1000 * 1000 * 1000)}"

if tags:

point += f",{tags}"

if fields:

point += f" {fields}"

if timestamp:

point += timestamp

return point

# 示例JSON数据

json_data = {

"sensor_id": "sensor1",

"temperature": 25.5,

"humidity": 60,

"time": "2023-10-05T12:00:00Z"

}

measurement_name = "environment_data"

line_protocol = json_to_line_protocol(json_data, measurement_name)

print(line_protocol)

在上述代码中:

  • json_to_line_protocol函数接收 JSON 数据和测量值名称作为参数。
  • 遍历 JSON 数据中的每个键值对,根据值的类型进行处理。如果值是数字类型(int或float),将其作为字段添加到fields字符串中;如果值是字符串类型,将其作为标签添加到tags字符串中;如果键是time,则提取时间戳并进行格式化处理,存储到timestamp字符串中。
  • 最后,将测量值名称、标签、字段和时间戳按照 Line Protocol 的格式拼接成一个完整的字符串并返回。

此外,在实际应用中,还可能需要对数据进行清洗和预处理,例如去除异常值、填补缺失值等。以下是一个简单的数据清洗示例,假设我们要去除温度数据中的异常值(假设温度值应该在 - 20 到 50 之间) :

def clean_data(data):

if "temperature" in data:

temperature = data["temperature"]

if temperature < -20 or temperature > 50:

del data["temperature"]

return data

# 示例数据

raw_data = {

"sensor_id": "sensor1",

"temperature": 100,

"humidity": 60

}

cleaned_data = clean_data(raw_data)

print(cleaned_data)

在这个示例中,clean_data函数检查数据中的温度值,如果温度值超出了合理范围(-20 到 50),则删除该温度字段,从而实现对数据的清洗。通过这些数据格式转换和处理步骤,可以确保 MQTT 消息能够正确、有效地写入 InfluxDB 中。

案例实践

场景描述

为了更直观地展示 InfluxDB 与 MQTT 协议集成的实际应用效果,我们以智能工厂中的设备监控场景为例进行实践。在这个智能工厂中,分布着大量的生产设备,每台设备上都安装了多种传感器,如温度传感器、压力传感器、转速传感器等 。这些传感器以一定的时间间隔(例如每 5 秒)采集设备的运行数据,并通过 MQTT 协议将数据发送到 Mosquitto 消息代理。

Mosquitto 消息代理负责接收来自各个传感器的消息,并根据消息的主题进行分发。我们的 Python 应用程序作为 MQTT 客户端,订阅了与设备数据相关的主题(如 “factory/device1/data”“factory/device2/data” 等),当接收到消息时,会对消息进行处理和解析,然后将解析后的数据写入 InfluxDB 中。

InfluxDB 作为时间序列数据库,负责存储这些设备运行数据。我们可以通过 InfluxDB 的查询功能,对存储的数据进行分析和查询,例如查询某台设备在过去一小时内的平均温度、某时间段内设备的运行状态变化等。通过对这些数据的分析,工厂管理人员可以及时了解设备的运行状况,预测设备故障,优化生产流程,提高生产效率和产品质量。

代码实现细节

下面展示完整的 Python 代码,该代码实现了 MQTT 订阅、数据处理和 InfluxDB 写入的功能 :

import paho.mqtt.client as mqtt

from influxdb_client import InfluxDBClient, Point

from influxdb_client.client.write_api import SYNCHRONOUS

import json

# InfluxDB配置

token = "your_token"

org = "your_org"

bucket = "your_bucket"

url = "http://localhost:8086"

# 创建InfluxDB客户端实例

influx_client = InfluxDBClient(url=url, token=token, org=org)

write_api = influx_client.write_api(write_options=SYNCHRONOUS)

# MQTT连接成功回调函数

def on_connect(client, userdata, flags, rc):

if rc == 0:

print("Connected to MQTT Broker!")

# 订阅主题

client.subscribe("factory/#")

else:

print(f"Failed to connect, return code {rc}")

# MQTT接收消息回调函数

def on_message(client, userdata, msg):

try:

# 解析MQTT消息内容,假设消息是JSON格式

message = msg.payload.decode()

data = json.loads(message)

# 构建InfluxDB的Point对象

point = Point("device_measurement")

for key, value in data.items():

point.field(key, value)

# 获取设备ID作为标签

device_id = data.get("device_id")

if device_id:

point.tag("device_id", device_id)

# 写入数据到InfluxDB

write_api.write(bucket, org, point)

print(f"Data written to InfluxDB: {data}")

except json.JSONDecodeError:

print("Received message is not in valid JSON format")

# 创建MQTT客户端实例

mqtt_client = mqtt.Client()

# 设置连接成功回调函数

mqtt_client.on_connect = on_connect

# 设置接收消息回调函数

mqtt_client.on_message = on_message

# 设置MQTT Broker地址和端口

broker_address = "localhost"

port = 1883

# 设置MQTT用户名和密码(如果需要)

# mqtt_client.username_pw_set("your_username", "your_password")

# 连接到MQTT Broker

mqtt_client.connect(broker_address, port, 60)

# 启动循环,保持连接并接收消息

mqtt_client.loop_start()

try:

while True:

pass

except KeyboardInterrupt:

print("Exiting...")

# 停止循环

mqtt_client.loop_stop()

# 断开连接

mqtt_client.disconnect()

# 关闭InfluxDB客户端

influx_client.close()

在上述代码中:

  • 首先配置了 InfluxDB 的访问令牌token、组织名称org、存储桶名称bucket以及 URL,并创建了InfluxDBClient实例和写入 API。
  • on_connect函数是 MQTT 连接成功后的回调函数,当客户端成功连接到 MQTT Broker 时,会订阅 “factory/#” 主题,该主题可以接收以 “factory/” 开头的所有子主题消息,确保能够获取到工厂中所有设备的数据。
  • on_message函数是 MQTT 接收消息后的回调函数。在这个函数中,首先尝试解析接收到的 JSON 格式的消息。然后,根据消息内容构建 InfluxDB 的Point对象,将消息中的每个键值对作为字段添加到Point中。同时,从消息中获取设备 ID,并将其作为标签添加到Point中,以便在 InfluxDB 中可以根据设备 ID 对数据进行分类和查询。最后,使用 InfluxDB 的写入 API 将Point对象写入到指定的存储桶和组织中。如果消息不是有效的 JSON 格式,则打印错误信息。
  • 创建了mqtt.Client()实例,并将on_connect和on_message函数分别设置为连接成功和接收消息的回调函数。然后连接到指定地址和端口的 MQTT Broker,并启动循环以保持连接并接收消息。在程序结束时,通过捕获KeyboardInterrupt异常,停止 MQTT 循环,断开与 MQTT Broker 的连接,并关闭 InfluxDB 客户端。

结果展示与分析

为了展示 InfluxDB 中存储的数据,我们可以使用 Grafana 等数据可视化工具。首先,在 Grafana 中添加 InfluxDB 作为数据源,配置好 InfluxDB 的 URL、访问令牌、组织和存储桶等信息。然后,创建一个新的仪表板,添加图表组件,通过编写 InfluxQL 查询语句来获取我们需要展示的数据。

例如,我们想要展示设备 1 在过去 24 小时内的温度变化趋势,可以编写如下 InfluxQL 查询语句 :

SELECT mean("temperature") FROM "device_measurement" WHERE "device_id" = 'device1' AND time >= now() - 24h GROUP BY time(10m)

上述查询语句的含义是:从 “device_measurement” 测量值(类似于表)中选择 “temperature” 字段的平均值,条件是 “device_id” 标签为 “device1” 且时间在过去 24 小时内,最后按照每 10 分钟的时间间隔进行分组。

通过 Grafana 的图表展示,我们可以清晰地看到设备 1 的温度随时间的变化趋势,如下图所示:

[此处插入设备 1 温度变化趋势图]

从图表中可以分析出,设备 1 在某些时间段内温度较高,可能需要关注设备的散热情况;而在其他时间段内温度较为稳定。通过对这些数据的分析,我们可以及时发现设备运行过程中可能存在的问题,提前采取措施进行维护和优化,避免设备故障对生产造成影响。同时,也可以根据这些数据对设备的性能进行评估,为设备的升级和改造提供依据。此外,还可以进一步分析不同设备之间的数据差异,找出生产效率较高的设备,总结经验并推广到其他设备上,从而提高整个工厂的生产效率和产品质量。

http://www.lryc.cn/news/601721.html

相关文章:

  • Linux网络-------2.应⽤层⾃定义协议与序列化
  • 基于深度学习的图像分割:使用DeepLabv3实现高效分割
  • 【C语言网络编程】HTTP 客户端请求(基于 Socket 的完整实现)
  • 程序代码篇---python向http界面发送数据
  • 【QT入门到晋级】window opencv安装及引入qtcreator(包含两种qt编译器:MSVC和MinGW)
  • 字节前端面试知识点总结
  • 应对反爬机制的具体方法与策略
  • 《 接口日志与异常处理统一设计:AOP与全局异常捕获》
  • Android 调试桥 (adb) 基础知识点
  • 【C 学习】02-究竟什么是C?
  • 【论文阅读】ON THE ROLE OF ATTENTION HEADS IN LARGE LANGUAGE MODEL SAFETY
  • 一文快速了解Docker和命令详解
  • 深度学习中的计算图与自动微分原理:静态图与动态图的实现差异
  • Leetcode力扣解题记录--第136题(查找单数)
  • Springboot+Layui英语单词学习系统的设计与实现
  • MyBatis Plus 分页
  • WiFi Mouse PC端 v1.7.2 官方中文版
  • 《杜甫传》读书笔记与经典摘要(三)流亡与走向人民
  • SPSC无锁环形队列技术(C++)
  • 系统整理Python的循环语句和常用方法
  • CPA青少年编程能力等级测评试卷及答案 Python编程(三级)
  • 详解力扣高频SQL50题之610. 判断三角形【简单】
  • 内存泄漏问题排查
  • idea打开后project窗口未显示项目名称的解决方案
  • 24点数学游戏(穷举法求解表达式)
  • 【计算机网络架构】网状型架构简介
  • Java学习-------序列化与反序列化
  • Windows10+WSL2+Docker相关整理
  • 2025年Agent创业实战指南:从0到1打造高增长AI智能体项目
  • ABP VNext + Elastic APM:微服务性能监控