InfluxDB 与 MQTT 协议集成实践(二)
集成实现步骤
配置 MQTT 客户端
在 Python 中,我们使用paho - mqtt库来配置 MQTT 客户端。paho - mqtt库是一个 Python 的 MQTT 客户端库,它提供了简单易用的接口,方便我们与 MQTT Broker 进行通信。
首先,确保已经安装了paho - mqtt库。如果没有安装,可以使用以下命令进行安装 :
pip install paho - mqtt
下面是一个简单的 MQTT 客户端配置示例,展示了如何连接到 MQTT Broker、订阅主题以及接收消息 :
import paho.mqtt.client as mqtt
# 连接成功回调函数
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
# 订阅主题
client.subscribe("sensor/data")
else:
print(f"Failed to connect, return code {rc}")
# 接收消息回调函数
def on_message(client, userdata, msg):
print(f"Received message on topic '{msg.topic}': {msg.payload.decode()}")
# 创建MQTT客户端实例
client = mqtt.Client()
# 设置连接成功回调函数
client.on_connect = on_connect
# 设置接收消息回调函数
client.on_message = on_message
# 设置MQTT Broker地址和端口
broker_address = "localhost"
port = 1883
# 连接到MQTT Broker
client.connect(broker_address, port, 60)
# 启动循环,保持连接并接收消息
client.loop_start()
try:
while True:
pass
except KeyboardInterrupt:
print("Exiting...")
# 停止循环
client.loop_stop()
# 断开连接
client.disconnect()
在上述代码中:
- on_connect函数是连接成功后的回调函数,当客户端成功连接到 MQTT Broker 时,会调用这个函数。在这个函数中,首先检查连接结果rc,如果rc为 0,表示连接成功,然后订阅了名为sensor/data的主题。
- on_message函数是接收消息后的回调函数,当客户端接收到订阅主题的消息时,会调用这个函数。它打印出接收到消息的主题和内容。
- 创建了一个mqtt.Client()实例,并将on_connect和on_message函数分别设置为连接成功和接收消息的回调函数。
- 使用client.connect方法连接到指定地址和端口的 MQTT Broker。
- 通过client.loop_start启动一个循环,用于保持与 MQTT Broker 的连接,并接收来自 Broker 的消息。最后,通过KeyboardInterrupt捕获用户的中断操作(如按下 Ctrl+C),停止循环并断开与 MQTT Broker 的连接。
数据写入 InfluxDB
接下来,我们需要将接收到的 MQTT 消息写入 InfluxDB。这里使用influxdb - client库来与 InfluxDB 进行交互。首先,安装influxdb - client库 :
pip install influxdb - client
假设已经在 MQTT 客户端的on_message回调函数中接收到了消息,下面展示如何将这些消息写入 InfluxDB :
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
# InfluxDB配置
token = "your_token"
org = "your_org"
bucket = "your_bucket"
url = "http://localhost:8086"
# 创建InfluxDB客户端实例
client = InfluxDBClient(url=url, token=token, org=org)
write_api = client.write_api(write_options=SYNCHRONOUS)
def on_message(client, userdata, msg):
# 解析MQTT消息内容
message = msg.payload.decode()
# 假设消息内容是JSON格式,解析JSON数据
try:
import json
data = json.loads(message)
# 构建InfluxDB的Point对象
point = Point("sensor_measurement")
for key, value in data.items():
point.field(key, value)
# 写入数据到InfluxDB
write_api.write(bucket, org, point)
print(f"Data written to InfluxDB: {data}")
except json.JSONDecodeError:
print("Received message is not in valid JSON format")
在上述代码中:
- 首先配置了 InfluxDB 的访问令牌token、组织名称org、存储桶名称bucket以及 URL。
- 创建了一个InfluxDBClient实例,并通过write_api获取写入 API,设置写入选项为同步模式SYNCHRONOUS,这样可以确保数据写入操作是同步进行的,便于调试和保证数据写入的顺序性。
- 在on_message函数中,首先将接收到的 MQTT 消息负载解码为字符串。然后假设消息内容是 JSON 格式,尝试解析 JSON 数据。如果解析成功,遍历解析后的数据,为每个键值对添加一个字段到Point对象中,Point对象的名称为sensor_measurement,表示测量值(类似于 InfluxDB 中的表名)。最后,使用write_api.write方法将Point对象写入到指定的存储桶和组织中。如果消息不是有效的 JSON 格式,则打印错误信息。
数据格式转换与处理
在将 MQTT 消息写入 InfluxDB 之前,通常需要进行数据格式转换和处理。MQTT 消息的格式多种多样,而 InfluxDB 要求数据以特定的 Line Protocol 格式写入。例如,如果 MQTT 消息是 JSON 格式,我们需要将其转换为 InfluxDB 的 Line Protocol 格式。
以下是一个将 JSON 格式的 MQTT 消息转换为 Line Protocol 格式的示例 :
import json
def json_to_line_protocol(json_data, measurement):
point = f"{measurement}"
tags = ""
fields = ""
timestamp = ""
for key, value in json_data.items():
if isinstance(value, (int, float)):
if fields:
fields += ","
fields += f"{key}={value}"
elif isinstance(value, str):
if tags:
tags += ","
tags += f"{key}=\"{value}\""
elif key == "time":
timestamp = f" {int(value)}" if isinstance(value, int) else f" {int(pd.Timestamp(value).timestamp() * 1000 * 1000 * 1000)}"
if tags:
point += f",{tags}"
if fields:
point += f" {fields}"
if timestamp:
point += timestamp
return point
# 示例JSON数据
json_data = {
"sensor_id": "sensor1",
"temperature": 25.5,
"humidity": 60,
"time": "2023-10-05T12:00:00Z"
}
measurement_name = "environment_data"
line_protocol = json_to_line_protocol(json_data, measurement_name)
print(line_protocol)
在上述代码中:
- json_to_line_protocol函数接收 JSON 数据和测量值名称作为参数。
- 遍历 JSON 数据中的每个键值对,根据值的类型进行处理。如果值是数字类型(int或float),将其作为字段添加到fields字符串中;如果值是字符串类型,将其作为标签添加到tags字符串中;如果键是time,则提取时间戳并进行格式化处理,存储到timestamp字符串中。
- 最后,将测量值名称、标签、字段和时间戳按照 Line Protocol 的格式拼接成一个完整的字符串并返回。
此外,在实际应用中,还可能需要对数据进行清洗和预处理,例如去除异常值、填补缺失值等。以下是一个简单的数据清洗示例,假设我们要去除温度数据中的异常值(假设温度值应该在 - 20 到 50 之间) :
def clean_data(data):
if "temperature" in data:
temperature = data["temperature"]
if temperature < -20 or temperature > 50:
del data["temperature"]
return data
# 示例数据
raw_data = {
"sensor_id": "sensor1",
"temperature": 100,
"humidity": 60
}
cleaned_data = clean_data(raw_data)
print(cleaned_data)
在这个示例中,clean_data函数检查数据中的温度值,如果温度值超出了合理范围(-20 到 50),则删除该温度字段,从而实现对数据的清洗。通过这些数据格式转换和处理步骤,可以确保 MQTT 消息能够正确、有效地写入 InfluxDB 中。
案例实践
场景描述
为了更直观地展示 InfluxDB 与 MQTT 协议集成的实际应用效果,我们以智能工厂中的设备监控场景为例进行实践。在这个智能工厂中,分布着大量的生产设备,每台设备上都安装了多种传感器,如温度传感器、压力传感器、转速传感器等 。这些传感器以一定的时间间隔(例如每 5 秒)采集设备的运行数据,并通过 MQTT 协议将数据发送到 Mosquitto 消息代理。
Mosquitto 消息代理负责接收来自各个传感器的消息,并根据消息的主题进行分发。我们的 Python 应用程序作为 MQTT 客户端,订阅了与设备数据相关的主题(如 “factory/device1/data”“factory/device2/data” 等),当接收到消息时,会对消息进行处理和解析,然后将解析后的数据写入 InfluxDB 中。
InfluxDB 作为时间序列数据库,负责存储这些设备运行数据。我们可以通过 InfluxDB 的查询功能,对存储的数据进行分析和查询,例如查询某台设备在过去一小时内的平均温度、某时间段内设备的运行状态变化等。通过对这些数据的分析,工厂管理人员可以及时了解设备的运行状况,预测设备故障,优化生产流程,提高生产效率和产品质量。
代码实现细节
下面展示完整的 Python 代码,该代码实现了 MQTT 订阅、数据处理和 InfluxDB 写入的功能 :
import paho.mqtt.client as mqtt
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
import json
# InfluxDB配置
token = "your_token"
org = "your_org"
bucket = "your_bucket"
url = "http://localhost:8086"
# 创建InfluxDB客户端实例
influx_client = InfluxDBClient(url=url, token=token, org=org)
write_api = influx_client.write_api(write_options=SYNCHRONOUS)
# MQTT连接成功回调函数
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
# 订阅主题
client.subscribe("factory/#")
else:
print(f"Failed to connect, return code {rc}")
# MQTT接收消息回调函数
def on_message(client, userdata, msg):
try:
# 解析MQTT消息内容,假设消息是JSON格式
message = msg.payload.decode()
data = json.loads(message)
# 构建InfluxDB的Point对象
point = Point("device_measurement")
for key, value in data.items():
point.field(key, value)
# 获取设备ID作为标签
device_id = data.get("device_id")
if device_id:
point.tag("device_id", device_id)
# 写入数据到InfluxDB
write_api.write(bucket, org, point)
print(f"Data written to InfluxDB: {data}")
except json.JSONDecodeError:
print("Received message is not in valid JSON format")
# 创建MQTT客户端实例
mqtt_client = mqtt.Client()
# 设置连接成功回调函数
mqtt_client.on_connect = on_connect
# 设置接收消息回调函数
mqtt_client.on_message = on_message
# 设置MQTT Broker地址和端口
broker_address = "localhost"
port = 1883
# 设置MQTT用户名和密码(如果需要)
# mqtt_client.username_pw_set("your_username", "your_password")
# 连接到MQTT Broker
mqtt_client.connect(broker_address, port, 60)
# 启动循环,保持连接并接收消息
mqtt_client.loop_start()
try:
while True:
pass
except KeyboardInterrupt:
print("Exiting...")
# 停止循环
mqtt_client.loop_stop()
# 断开连接
mqtt_client.disconnect()
# 关闭InfluxDB客户端
influx_client.close()
在上述代码中:
- 首先配置了 InfluxDB 的访问令牌token、组织名称org、存储桶名称bucket以及 URL,并创建了InfluxDBClient实例和写入 API。
- on_connect函数是 MQTT 连接成功后的回调函数,当客户端成功连接到 MQTT Broker 时,会订阅 “factory/#” 主题,该主题可以接收以 “factory/” 开头的所有子主题消息,确保能够获取到工厂中所有设备的数据。
- on_message函数是 MQTT 接收消息后的回调函数。在这个函数中,首先尝试解析接收到的 JSON 格式的消息。然后,根据消息内容构建 InfluxDB 的Point对象,将消息中的每个键值对作为字段添加到Point中。同时,从消息中获取设备 ID,并将其作为标签添加到Point中,以便在 InfluxDB 中可以根据设备 ID 对数据进行分类和查询。最后,使用 InfluxDB 的写入 API 将Point对象写入到指定的存储桶和组织中。如果消息不是有效的 JSON 格式,则打印错误信息。
- 创建了mqtt.Client()实例,并将on_connect和on_message函数分别设置为连接成功和接收消息的回调函数。然后连接到指定地址和端口的 MQTT Broker,并启动循环以保持连接并接收消息。在程序结束时,通过捕获KeyboardInterrupt异常,停止 MQTT 循环,断开与 MQTT Broker 的连接,并关闭 InfluxDB 客户端。
结果展示与分析
为了展示 InfluxDB 中存储的数据,我们可以使用 Grafana 等数据可视化工具。首先,在 Grafana 中添加 InfluxDB 作为数据源,配置好 InfluxDB 的 URL、访问令牌、组织和存储桶等信息。然后,创建一个新的仪表板,添加图表组件,通过编写 InfluxQL 查询语句来获取我们需要展示的数据。
例如,我们想要展示设备 1 在过去 24 小时内的温度变化趋势,可以编写如下 InfluxQL 查询语句 :
SELECT mean("temperature") FROM "device_measurement" WHERE "device_id" = 'device1' AND time >= now() - 24h GROUP BY time(10m)
上述查询语句的含义是:从 “device_measurement” 测量值(类似于表)中选择 “temperature” 字段的平均值,条件是 “device_id” 标签为 “device1” 且时间在过去 24 小时内,最后按照每 10 分钟的时间间隔进行分组。
通过 Grafana 的图表展示,我们可以清晰地看到设备 1 的温度随时间的变化趋势,如下图所示:
[此处插入设备 1 温度变化趋势图]
从图表中可以分析出,设备 1 在某些时间段内温度较高,可能需要关注设备的散热情况;而在其他时间段内温度较为稳定。通过对这些数据的分析,我们可以及时发现设备运行过程中可能存在的问题,提前采取措施进行维护和优化,避免设备故障对生产造成影响。同时,也可以根据这些数据对设备的性能进行评估,为设备的升级和改造提供依据。此外,还可以进一步分析不同设备之间的数据差异,找出生产效率较高的设备,总结经验并推广到其他设备上,从而提高整个工厂的生产效率和产品质量。