当前位置: 首页 > news >正文

使用Telegraf从工业物联网设备收集数据的完整指南

在当今的工业物联网(IIoT)环境中,从各种设备收集、处理和转发数据是一项关键任务。本文将详细介绍如何使用Telegraf这一强大的数据收集工具,从多种通信协议的IoT设备中收集数据,并将其发送到InfluxDB等数据存储系统。

Telegraf简介

Telegraf是一个开源的数据收集代理,它能够从各种资源聚合数据,使用不同的通信协议,在处理阶段转换传入数据,生成聚合指标,并将处理后的数据转发给数据消费者。其工作原理类似于提取-转换-加载(ETL)过程。

Telegraf的主要优势包括:

  • 支持广泛的设备资源集成插件
  • 开源特性
  • 灵活的插件系统(使用Go语言实现)
  • 可扩展性强,可创建自定义插件

在这里插入图片描述

物联网用例定义

在我们的示例中,我们将模拟一个工业物联网环境,包含以下设备:

  1. SNMP设备(通过SNMP协议通信)
  2. HTTP设备(通过REST API通信)
  3. MQTT设备(通过发布/订阅模式通信)
  4. OPC UA设备(通过工业标准OPC UA协议通信)

这些设备将生成随机数据,Telegraf将作为桥梁接收所有这些数据,进行转换,并将其发送到InfluxDB进行存储和可视化,同时还可以转发到Kafka等其他工具。

在这里插入图片描述

创建基础Telegraf环境

首先,我们需要设置一个基础环境,包括Telegraf和InfluxDB,使用Docker Compose进行管理。

telegraf.conf配置文件

# Agent Configuration
[agent]## Default data collection interval for all inputsinterval = "10s"# Input Plugins
[[inputs.cpu]]## no configuration options required[[inputs.mem]]## no configuration options required# Output Plugins
[[outputs.file]]## Output file pathfiles = ["stdout"]

docker-compose.yml文件

version: '3'
services:telegraf:image: telegrafvolumes:- ./telegraf.conf:/etc/telegraf/telegraf.conf:ro

对于本地安装,可以使用以下命令启动Telegraf(以MacOS为例):

telegraf --config /path/to/telegraf.conf

添加设备并扩展Telegraf配置

1. 集成SNMP设备

SNMP(简单网络管理协议)是广泛使用的网络管理协议,用于监控和管理网络设备。

SNMP Dockerfile (Dockerfile_snmp_agent):

FROM debian:latestRUN apt-get update && apt-get install -y snmp snmpdCOPY snmpd.conf /etc/snmp/snmpd.confEXPOSE 161/udpCMD ["snmpd", "-f", "-Lo", "-C", "-c", "/etc/snmp/snmpd.conf"]

snmpd.conf配置文件:

com2sec readonly default public
group MyROGroup v1 readonly
group MyROGroup v2c readonly
view all included .1 80
access MyROGroup "" any noauth exact all none none
syslocation "Location"
syscontact "Contact"

Telegraf配置:

[[inputs.snmp]]agents = ["snmp_agent"]version = 2community = "public"interval = "10s"timeout = "5s"[[inputs.snmp.field]]name = "cpu"oid = "1.3.6.1.4.1.2021.11.11.0"[[inputs.snmp.field]]name = "memory"oid = "1.3.6.1.4.1.2021.4.6.0"

2. 集成HTTP设备

我们创建一个基于Flask的REST API服务器,使用psutil库收集CPU和内存使用情况。

Flask应用(app.py):

from flask import Flask, jsonify
import psutilapp = Flask(__name__)@app.route('/metrics', methods=['GET'])
def get_metrics():cpu_percent = psutil.cpu_percent(interval=1)memory_usage = psutil.virtual_memory().percentmetrics = {'cpu_percent': cpu_percent,'memory_usage': memory_usage}return jsonify(metrics)if __name__ == '__main__':app.run(host='0.0.0.0', port=8080)

Dockerfile (Dockerfile_http_api):

FROM python:3WORKDIR /appCOPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txtCOPY app.py .CMD ["python", "app.py"]

requirements.txt:

flask
psutil

Telegraf配置:

[[inputs.http_response]]interval = "10s"name_override = "http_metrics"urls = ["http://http_api:8080/metrics"]method = "GET"response_timeout = "5s"data_format = "json"

3. 集成MQTT设备

MQTT基于发布/订阅模式,需要发布者、代理和订阅者三个组件。

MQTT发布者Python应用(dummy_sensor.py):

import paho.mqtt.client as mqtt
import psutil
import timebroker = "mqtt_broker"client = mqtt.Client()client.connect(broker)while True:cpu_percent = psutil.cpu_percent(interval=1)mem_percent = psutil.virtual_memory().percentclient.publish("sensor/cpu", cpu_percent)client.publish("sensor/mem", mem_percent)time.sleep(5)

Dockerfile (与HTTP示例类似):

FROM python:3.9WORKDIR /appCOPY dummy_sensor.py .RUN pip install paho-mqtt psutilCMD ["python", "dummy_sensor.py"]

MQTT代理配置(mosquitto.conf):

persistence true
persistence_location /mosquitto/data/log_dest file /mosquitto/log/mosquitto.log
log_type alllistener 1883
allow_anonymous true

Telegraf配置:

[[inputs.mqtt_consumer]]servers = ["tcp://mqtt_broker:1883"]topics = ["sensor/cpu/#", "sensor/mem/#"]data_format = "value"data_type = "float"

4. 集成OPC UA设备

OPC UA是工业标准通信协议,基于服务器-客户端通信机制。

OPC UA服务器Python应用(opcua_server.py):

from opcua import Server, ua
import timeserver = Server()server.set_endpoint("opc.tcp://0.0.0.0:4840/freeopcua/server/")server.set_security_policy([ua.SecurityPolicyType.NoSecurity,ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt,ua.SecurityPolicyType.Basic256Sha256_Sign])uri = "http://example.com"
idx = server.register_namespace(uri)node = server.get_objects_node()obj = node.add_object(idx, "MyObject1")var = obj.add_variable(idx, "MyVariable", 0.0)
var.set_writable()server.start()print("OPC UA server is running!")try:while True:value = var.get_value()print(value)value += 1.0var.set_value(value)time.sleep(5)
finally:server.stop()

Dockerfile (Dockerfile_dummy_sensor):

FROM python:3.9WORKDIR /appCOPY opcua_server.py .RUN pip install opcuaCMD ["python", "opcua_server.py"]

Telegraf配置:

[[inputs.opcua]]name = "opcua"endpoint = "opc.tcp://opcua_server:4840/freeopcua/server/"security_policy = "None"security_mode = "None"auth_method = "Anonymous"nodes = [{ name = "status", namespace = "2", identifier_type = "i", identifier = "2" }]

整合所有配置并扩展Docker Compose

将所有设备的配置整合到一个telegraf.conf文件中:

[[inputs.opcua]]name = "opcua"endpoint = "opc.tcp://opcua_server:4840/freeopcua/server/"security_policy = "None"security_mode = "None"auth_method = "Anonymous"nodes = [{ name = "status", namespace = "2", identifier_type = "i", identifier = "2" }][[inputs.http_response]]interval = "10s"name_override = "http_metrics"urls = ["http://http_api:8080/metrics"]method = "GET"response_timeout = "5s"data_format = "json"[[inputs.snmp]]agents = ["snmp_agent"]version = 2community = "public"interval = "10s"timeout = "5s"[[inputs.snmp.field]]name = "cpu"oid = "1.3.6.1.4.1.2021.11.11.0"[[inputs.snmp.field]]name = "memory"oid = "1.3.6.1.4.1.2021.4.6.0"[[inputs.mqtt_consumer]]servers = ["tcp://mqtt_broker:1883"]topics = ["sensor/cpu/#", "sensor/mem/#"]data_format = "value"data_type = "float"[[outputs.influxdb_v2]]urls = ["http://influxdb:8086"]token = "replace-with-your-own-token"organization = "test-org"bucket = "metrics"

完整的docker-compose.yml文件:

version: "3"services:http_api:build:context: .dockerfile: Dockerfile_http_apiports:- 8080:8080snmp_agent:build:context: .dockerfile: Dockerfile_snmp_agentports:- 161:161/udpdummy_sensor:build:context: .dockerfile: Dockerfile_dummy_sensordepends_on:- mqtt_brokeropcua_server:build:context: .dockerfile: Dockerfile_opcua_servermqtt_broker:image: eclipse-mosquittoports:- 1883:1883volumes:- ./mosquitto.conf:/mosquitto/config/mosquitto.conf- mosquitto_data:/mosquitto/data- mosquitto_log:/mosquitto/logtelegraf:image: telegrafvolumes:- ./telegraf.conf:/etc/telegraf/telegraf.confdepends_on:- mqtt_broker- influxdbinfluxdb:image: influxdbvolumes:- influxdb_data:/var/lib/influxdbports:- 8086:8086volumes:influxdb_data:mosquitto_data:mosquitto_log:

替换InfluxDB配置

启动docker-compose后,需要确保InfluxDB的bucket和token配置正确。可以参考之前的文章了解如何配置InfluxDB以观察和可视化传入数据。

总结

本文详细介绍了如何使用Telegraf从多种通信协议的IoT设备中收集数据。我们涵盖了:

  1. 定义物联网用例
  2. 创建基础Telegraf环境
  3. 添加各种设备(SNMP、HTTP、MQTT、OPC UA)并扩展Telegraf配置
  4. 整合所有配置并通过Docker Compose运行整个环境

Telegraf提供了丰富的插件支持多种通信协议,本文示例中还涉及了Telegraf的处理器、聚合器和输出等未详细探讨的部分。在未来的文章中,我们将进一步完善这些内容。

通过这个完整的示例,您可以轻松搭建一个工业物联网数据收集系统,为后续的数据分析和可视化奠定基础。

http://www.lryc.cn/news/590416.html

相关文章:

  • Beautiful Soup(BS4)
  • ABP VNext + EF Core 二级缓存:提升查询性能
  • AI炒作,AGI或在2080年之前也无法实现,通用人工智能AGI面临幻灭
  • 【RTSP从零实践】13、TCP传输AAC格式RTP包(RTP_over_TCP)的RTSP服务器(附带源码)
  • 50天50个小项目 (Vue3 + Tailwindcss V4) ✨ | AutoTextEffect(自动打字机)
  • 使用Whistle自定义接口返回内容:Mock流式JSON数据全解析
  • SQL性能分析
  • C# --- 单例类错误初始化 + 没有释放资源导致线程泄漏
  • 【Linux】如何使用nano创建并编辑一个文件
  • 动态规划题解_打家劫舍【LeetCode】
  • 编译原理第四到五章(知识点学习/期末复习/笔试/面试)
  • 部分排序算法的Java模拟实现(复习向,非0基础)
  • AWS ML Specialist 考试备考指南
  • 【Qt】麒麟系统安装套件
  • uniapp写好的弹窗组件
  • OWASP Top 10 攻击场景实战
  • 在 CentOS 8 上彻底卸载 Kubernetes(k8s)
  • 01 启动流程实例
  • ICMR-2025 | 杭电多智能体协作具身导航框架!MMCNav:基于MLLM的多智能体协作户外视觉语言导航
  • 钱包核心标准 BIP32、BIP39、BIP44:从助记词到多链钱包的底层逻辑
  • STM32F4踩坑小记——使用HAL库函数进入HardFault
  • 蓝光三维扫描技术:手机闪光灯模块全尺寸3D检测的高效解决方案
  • HTML基础知识 二(创建容器和表格)
  • 在虚拟环境中复现论文(环境配置)
  • Class<T> 类传递及泛型数组
  • SSH连接复用技术在海外云服务器环境下的稳定性验证与优化方案
  • 动态规划的核心性质——最优化原理 (Principle of Optimality)
  • git的diff命令、Config和.gitignore文件
  • Python编程基础(六)| 用户输入和while循环
  • slurm设置用户节点和分区权限