当前位置: 首页 > news >正文

InfluxDB 与 Python 框架结合:Django 应用案例(二)

四、集成步骤

4.1 使用 Python 客户端库连接 InfluxDB

在 Django 项目中,使用influxdb-client库来连接 InfluxDB。首先,在settings.py文件中配置 InfluxDB 的连接参数,这样便于集中管理和修改配置信息:

 

INFLUXDB_CONFIG = {

'url': 'http://localhost:8086',

'token': 'your_token',

'org': 'your_org',

'bucket': 'your_bucket'

}

在需要连接 InfluxDB 的 Python 文件中,导入相关库并进行连接操作。例如,在myapp/utils.py中:

 

from influxdb_client import InfluxDBClient

from django.conf import settings

def get_influxdb_client():

config = settings.INFLUXDB_CONFIG

client = InfluxDBClient(url=config['url'], token=config['token'], org=config['org'])

return client

在上述代码中,get_influxdb_client函数从 Django 的设置中获取 InfluxDB 的配置参数,然后使用这些参数创建并返回一个InfluxDBClient实例。其中,url指定了 InfluxDB 的服务地址,token是用于身份验证的令牌,org表示组织名,bucket则是数据存储的桶名 。通过这种方式,在 Django 项目的其他部分可以方便地调用该函数获取 InfluxDB 客户端实例,进而进行数据的读写操作。

4.2 创建自定义模型层封装逻辑

为了使代码结构更加清晰、可维护,创建一个自定义的模型层来封装 InfluxDB 的操作逻辑。在myapp/models.py中定义一个类,例如InfluxDBModel:

 

from myapp.utils import get_influxdb_client

class InfluxDBModel:

def __init__(self):

self.client = get_influxdb_client()

self.write_api = self.client.write_api()

self.query_api = self.client.query_api()

def write_data(self, measurement, tags, fields, time):

point = {

"measurement": measurement,

"tags": tags,

"fields": fields,

"time": time

}

self.write_api.write(bucket=settings.INFLUXDB_CONFIG['bucket'], org=settings.INFLUXDB_CONFIG['org'],

record=point)

def query_data(self, query):

result = self.query_api.query(org=settings.INFLUXDB_CONFIG['org'], query=query)

return result

在这个类中,__init__方法初始化了 InfluxDB 客户端以及写入和查询 API。write_data方法接受测量名称、标签、字段和时间等参数,构建数据点并将其写入 InfluxDB;query_data方法则接受一个查询语句,执行查询并返回结果。通过这样的封装,在 Django 的视图函数或其他业务逻辑中,只需要调用InfluxDBModel类的方法,而无需关心底层的 InfluxDB 连接和操作细节,提高了代码的复用性和可维护性 。例如,在视图函数中可以这样使用:

 

from myapp.models import InfluxDBModel

def my_view(request):

influx_model = InfluxDBModel()

tags = {'tag1': 'value1', 'tag2': 'value2'}

fields = {'field1': 10, 'field2': 20.5}

from datetime import datetime

time = datetime.utcnow()

influx_model.write_data('my_measurement', tags, fields, time)

query = 'from(bucket:"your_bucket") |> range(start: -1h) |> filter(fn: (r) => r._measurement == "my_measurement")'

result = influx_model.query_data(query)

for table in result:

for record in table.records:

print(record.values)

4.3 结合 Celery 实现异步写入

在一些场景中,数据写入 InfluxDB 的操作可能会比较耗时,为了避免阻塞主线程,影响用户体验,可以引入 Celery 和消息队列(如 RabbitMQ、Redis)来实现异步写入。

首先,安装 Celery 和相关的消息队列客户端库。如果使用 Redis 作为消息队列,安装命令如下:

 

pip install celery redis

在 Django 项目的根目录下创建celery.py文件,进行 Celery 的配置:

 

import os

from celery import Celery

# 设置Django项目的默认设置模块

os.environ.setdefault('DJANGO_SETTINGS_MODULE','myproject.settings')

app = Celery('myproject')

# 从Django设置中加载Celery配置,使用CELERY_ 前缀

app.config_from_object('django.conf:settings', namespace='CELERY')

# 自动发现所有注册的Django应用中的任务

app.autodiscover_tasks()

在settings.py中添加 Celery 的配置,例如:

 

CELERY_BROKER_URL ='redis://localhost:6379/0'

CELERY_RESULT_BACKEND ='redis://localhost:6379/0'

CELERY_TIMEZONE = 'Asia/Shanghai'

这里配置了使用 Redis 作为消息队列和结果存储后端,并设置了时区为上海时间。

接下来,在myapp/tasks.py中定义异步任务。例如,定义一个将数据写入 InfluxDB 的异步任务:

 

from celery import shared_task

from myapp.models import InfluxDBModel

@shared_task

def write_influxdb_task(measurement, tags, fields, time):

influx_model = InfluxDBModel()

influx_model.write_data(measurement, tags, fields, time)

在视图函数或其他业务逻辑中,调用这个异步任务,将数据写入操作放入消息队列中异步执行:

 

from myapp.tasks import write_influxdb_task

def my_view(request):

tags = {'tag1': 'value1', 'tag2': 'value2'}

fields = {'field1': 10, 'field2': 20.5}

from datetime import datetime

time = datetime.utcnow()

write_influxdb_task.delay('my_measurement', tags, fields, time)

通过delay方法,将任务发送到消息队列中,Celery 的 Worker 会从队列中获取任务并执行。这样,在处理大量数据写入时,主线程不会被阻塞,能够快速响应用户请求,提高了应用的性能和用户体验。同时,使用消息队列还可以实现任务的持久化,即使系统出现故障,任务也不会丢失,待系统恢复后会继续执行 。

五、应用案例实战

5.1 需求分析

以实时监控系统为例,该系统旨在对服务器的各项性能指标进行实时监测,以便管理员能够及时了解服务器的运行状态,及时发现并解决潜在问题。

在数据采集方面,需要收集的指标包括 CPU 使用率、内存使用率、磁盘 I/O 读写速率、网络带宽使用情况等。这些数据能够全面反映服务器的负载情况和性能表现。例如,CPU 使用率可以直接体现服务器的计算资源消耗情况,内存使用率则反映了服务器的内存资源占用状态,磁盘 I/O 读写速率影响着数据的存储和读取速度,网络带宽使用情况则关乎服务器与外部网络通信的能力 。

在展示指标上,系统需要将采集到的数据以直观的方式呈现给用户。对于 CPU 使用率和内存使用率,可以通过折线图展示其随时间的变化趋势,让管理员能够清晰地看到资源使用率的波动情况;磁盘 I/O 读写速率和网络带宽使用情况可以使用柱状图进行对比展示,方便管理员快速了解不同时间段的 I/O 和网络性能差异。同时,还需要提供实时数据的数值展示,以便管理员能够获取精确的指标数据 。

从用户交互需求来看,用户应能够自由选择查看数据的时间范围,如过去 1 小时、过去 1 天、过去 1 周等,以满足不同场景下的数据分析需求。此外,用户还希望能够对不同指标的数据进行对比分析,例如同时查看 CPU 使用率和内存使用率在某一时间段内的变化情况,从而更全面地了解服务器的性能状况 。同时,为了方便用户快速定位问题,系统还应支持数据的筛选和搜索功能,例如根据特定的时间点或服务器名称筛选数据。

5.2 数据采集与存储

在 Django 应用中,可以使用 Python 的psutil库来采集服务器的性能数据。首先,安装psutil库:

 

pip install psutil

然后,在myapp/views.py中编写数据采集和存储的逻辑代码。假设已经定义好了InfluxDBModel类用于与 InfluxDB 交互:

 

from django.http import HttpResponse

from myapp.models import InfluxDBModel

import psutil

from datetime import datetime

def collect_data(request):

influx_model = InfluxDBModel()

cpu_percent = psutil.cpu_percent(interval=1)

mem_percent = psutil.virtual_memory().percent

disk_io_read = psutil.disk_io_counters().read_bytes

disk_io_write = psutil.disk_io_counters().write_bytes

net_io_sent = psutil.net_io_counters().bytes_sent

net_io_recv = psutil.net_io_counters().bytes_recv

tags = {

'host': 'your_server_hostname'

}

fields = {

'cpu_percent': cpu_percent,

'mem_percent': mem_percent,

'disk_io_read': disk_io_read,

'disk_io_write': disk_io_write,

'net_io_sent': net_io_sent,

'net_io_recv': net_io_recv

}

time = datetime.utcnow()

influx_model.write_data('server_performance', tags, fields, time)

return HttpResponse('Data collected and stored successfully')

在上述代码中,collect_data视图函数首先获取InfluxDBModel实例。接着,使用psutil库分别获取 CPU 使用率、内存使用率、磁盘 I/O 读写字节数以及网络 I/O 收发字节数 。然后,构建包含标签、字段和时间的数据点,并调用InfluxDBModel的write_data方法将数据写入 InfluxDB。最后,返回一个成功响应给客户端 。通过这种方式,实现了在 Django 应用中对服务器性能数据的采集和存储到 InfluxDB 的操作。

5.3 数据查询与展示

在 Django 视图中从 InfluxDB 查询数据,同样使用InfluxDBModel类中的query_data方法。在myapp/views.py中添加查询数据的视图函数:

 

from django.shortcuts import render

from myapp.models import InfluxDBModel

def show_data(request):

influx_model = InfluxDBModel()

query = 'from(bucket:"your_bucket") |> range(start: -1h) |> filter(fn: (r) => r._measurement == "server_performance")'

result = influx_model.query_data(query)

cpu_data = []

mem_data = []

time_data = []

for table in result:

for record in table.records:

values = record.values

time_data.append(values['_time'])

cpu_data.append(values['cpu_percent'])

mem_data.append(values['mem_percent'])

context = {

'cpu_data': cpu_data,

'mem_data': mem_data,

'time_data': time_data

}

return render(request,'show_data.html', context)

在这个视图函数中,首先创建InfluxDBModel实例,然后定义一个查询语句,查询过去 1 小时内server_performance测量中的数据 。执行查询后,遍历查询结果,提取时间、CPU 使用率和内存使用率数据,并分别存储到对应的列表中。最后,将这些数据作为上下文传递给show_data.html模板 。

在show_data.html模板中,使用 Echarts 库来展示数据。假设已经在模板中引入了 Echarts 库:

 

<!DOCTYPE html>

<html lang="en">

<head>

<meta charset="UTF-8">

<title>Server Performance</title>

<script src="https://cdn.jsdelivr.net/npm/echarts@5.4.1/dist/echarts.min.js"></script>

</head>

<body>

<div id="cpu_chart" style="width: 800px; height: 400px;"></div>

<div id="mem_chart" style="width: 800px; height: 400px;"></div>

<script>

var cpuData = {{ cpu_data|safe }};

var memData = {{ mem_data|safe }};

var timeData = {{ time_data|safe }};

var cpuChart = echarts.init(document.getElementById('cpu_chart'));

var memChart = echarts.init(document.getElementById('mem_chart'));

cpuChart.setOption({

title: {

text: 'CPU Usage'

},

xAxis: {

type: 'time',

data: timeData

},

yAxis: {

type: 'value',

name: 'Percentage'

},

series: [{

data: cpuData,

type: 'line'

}]

});

memChart.setOption({

title: {

text: 'Memory Usage'

},

xAxis: {

type: 'time',

data: timeData

},

yAxis: {

type: 'value',

name: 'Percentage'

},

series: [{

data: memData,

type: 'line'

}]

});

</script>

</body>

</html>

在模板中,首先从视图传递过来的上下文中获取数据,并将其转换为 JavaScript 数组。然后,使用 Echarts 初始化两个图表,分别用于展示 CPU 使用率和内存使用率随时间的变化。通过配置图表的标题、坐标轴和系列数据,实现了数据的可视化展示 。用户在浏览器中访问show_data视图对应的 URL 时,就可以看到服务器性能数据的可视化图表。

http://www.lryc.cn/news/605744.html

相关文章:

  • DoRA详解:从LoRA到权重分解的进化
  • 小杰数据结构(three day)——静以修身,俭以养德。
  • 【Linux系统】库的制作与原理
  • 【数据结构】算法代码
  • 渗透RCE
  • TS 常用类型与语法
  • Cesium 快速入门(六)实体类型介绍
  • Jmeter 性能测试常用图表、服务器资源监控
  • C语言指针(三):数组传参本质、冒泡排序与二级指针详解
  • FISCO BCOS Gin调用WeBASE-Front接口发请求
  • [硬件电路-111]:滤波的分类:模拟滤波与数字滤波; 无源滤波与有源滤波;低通、带通、带阻、高通滤波;时域滤波与频域滤波;低价滤波与高阶滤波。
  • 操作系统数据格式相关(AI回答)
  • 无人船 | 图解基于LQR控制的路径跟踪算法(以欠驱动无人艇Otter为例)
  • 学习笔记《区块链技术与应用》第4天 比特币脚本语言
  • Docker部署Seata
  • Linux核心转储(Core Dump)原理、配置与调试实践
  • 前端-移动Web-day2
  • 野生动物巡查系统(H题)--2025 年全国大学生电子设计竞赛试题
  • ENSP防火墙部署
  • 快速理解RTOS中的pendsv中断和systick中断
  • Java Stream进阶:map是“一对一”,flatMap是“一对多”
  • H.266 vs H.265/AV1/H.264:从工程落地看下一代视频系统的技术演进
  • 前端核心技术Node.js(五)——Mongodb、Mongoose和接口
  • Web3:在 VSCode 中基于 Foundry 快速构建 Solidity 智能合约本地开发环境
  • 硬核技术协同:x86 生态、机密计算与云原生等技术如何为产业数字化转型筑底赋能
  • 云原生环境 DDoS 防护:容器化架构下的流量管控与弹性应对
  • 对git 熟悉时,常用操作
  • Java学习第九十一部分——OkHttp
  • MongoDB用户认证authSource
  • 微服务架构技巧篇——接口类设计技巧