当前位置: 首页 > news >正文

Linux服务器磁盘及内存用量监控Python脚本(推送钉钉群通知)

文章目录

  • Python 脚本
  • 钉钉推送通知
  • 定时任务

Python 脚本

# -*- coding: utf-8 -*-
import subprocessdef get_disk_usage():# 执行 df 命令获取磁盘使用情况df_process = subprocess.Popen(['df', '-h', '/'], stdout=subprocess.PIPE)output, _ = df_process.communicate()output = output.decode('utf-8')# 解析输出,获取磁盘总量和已使用占比total_space = Noneused_percentage = Nonelines = output.split('\n')for line in lines:if line.startswith('/dev'):parts = line.split()total_space = parts[1]used_percentage = parts[4]breakprint(f"磁盘总量: {total_space}, 已使用: {used_percentage}")return total_space, used_percentagedef check_disk_full(used_percentage, threshold_percent=90):# 检查磁盘是否快满了if used_percentage is not None:used_percentage = int(used_percentage[:-1])  # 去掉百分号并转换为整数if used_percentage >= threshold_percent:return Truereturn Falsedef get_memory_usage():try:# 执行 free 命令获取内存信息free_process = subprocess.Popen(['free', '-h'], stdout=subprocess.PIPE)output, _ = free_process.communicate()output = output.decode('utf-8')# 解析输出,获取内存总量和已使用占比lines = output.split('\n')for line in lines:if line.startswith('Mem:'):parts = line.split()total_memory = parts[1]  # 内存总量used_memory = parts[2]   # 已使用内存print(f"内存总量: {total_memory}, 已使用: {used_memory}")return total_memory, used_memoryreturn None, Noneexcept Exception as e:print(f"Error: {e}")return None, Nonedef check_memory_insufficient(total_memory, used_memory, threshold_percent=90):if total_memory is not None and used_memory is not None:# 解析内存值,去除单位,并将字符串转换为数值total_memory_value = float(total_memory[:-1])used_memory_value = float(used_memory[:-1])# 检查是否内存不足used_percentage = (used_memory_value / total_memory_value) * 100if used_percentage >= threshold_percent:return Truereturn Falseif __name__ == "__main__":# 获取磁盘使用情况total_space, used_percentage = get_disk_usage()if total_space and used_percentage:# 检查磁盘是否快满了(阈值默认为90%)if check_disk_full(used_percentage, threshold_percent=90):print("磁盘快满了!")else:print("未能获取磁盘使用情况。")# 获取内存使用情况total_memory, used_memory = get_memory_usage()if total_memory is not None and used_memory is not None:# 检查是否内存不足(默认阈值为90%)if check_memory_insufficient(total_memory, used_memory, threshold_percent=90):print("内存不足!")else:print("未能获取内存使用情况。")
  • 输出结果
磁盘总量: 36G, 已使用: 65%
内存总量: 4.7G, 已使用: 1.0G

钉钉推送通知

  • 钉钉自定义机器人Python脚本推送通知
  • 钉钉推送配置与阈值
- ding-talk:secret: 'xxx'access-token: 'xxx'
- project:name: '项目名称'disk-threshold: 80memory-threshold: 90
  • Python
pip3 install pyyaml
pip3 install requests
# -*- coding: utf-8 -*-
import subprocess
import yaml
import time
import hashlib
import base64
import hmac
import requests
from urllib.parse import quote
import socketdef get_disk_usage():# 执行 df 命令获取磁盘使用情况df_process = subprocess.Popen(['df', '-h', '/'], stdout=subprocess.PIPE)output, _ = df_process.communicate()output = output.decode('utf-8')# 解析输出,获取磁盘总量和已使用占比total_space = Noneused_percentage = Nonelines = output.split('\n')for line in lines:if line.startswith('/dev'):parts = line.split()total_space = parts[1]used_percentage = parts[4]breakprint(f"磁盘总量: {total_space}, 已使用: {used_percentage}")return total_space, used_percentagedef check_disk_full(used_percentage, threshold_percent=90):# 检查磁盘是否快满了if used_percentage is not None:used_percentage = int(used_percentage[:-1])  # 去掉百分号并转换为整数if used_percentage >= threshold_percent:return Truereturn Falsedef get_memory_usage():try:# 执行 free 命令获取内存信息free_process = subprocess.Popen(['free', '-h'], stdout=subprocess.PIPE)output, _ = free_process.communicate()output = output.decode('utf-8')# 解析输出,获取内存总量和已使用占比lines = output.split('\n')for line in lines:if line.startswith('Mem:'):parts = line.split()total_memory = parts[1]  # 内存总量used_memory = parts[2]   # 已使用内存print(f"内存总量: {total_memory}, 已使用: {used_memory}")return total_memory, used_memoryreturn None, Noneexcept Exception as e:print(f"Error: {e}")return None, Nonedef check_memory_insufficient(total_memory, used_memory, threshold_percent=90):if total_memory is not None and used_memory is not None:# 解析内存值,去除单位,并将字符串转换为数值total_memory_value = float(total_memory[:-1])used_memory_value = float(used_memory[:-1])# 检查是否内存不足used_percentage = (used_memory_value / total_memory_value) * 100if used_percentage >= threshold_percent:return Truereturn Falsedef read_yaml(file_path):with open(file_path, 'r', encoding='utf-8') as file:try:data = yaml.safe_load(file)return dataexcept yaml.YAMLError as e:print(f"读取 YAML 文件时出错:{e}")return Nonedef get_local_ip():try:# 创建一个 UDP 套接字sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)sock.connect(('8.8.8.8', 80))  # 连接 Google DNS# 获取本地 IP 地址local_ip = sock.getsockname()[0]return local_ipexcept Exception as e:print(f"Error: {e}")return Nonedef dingTalkSign(dingTalkSecret):# 获取当前时间戳,并将其转换为毫秒级timestamp = int(time.time() * 1000)# 将时间戳和钉钉应用的密钥拼接在一起,将拼接后的字符串转换为字节数组signBefore = ('%s\n%s' % (timestamp, dingTalkSecret)).encode('utf-8')# 用HMAC-SHA256算法对字节数组进行签名hsha256 = hmac.new(dingTalkSecret.encode('utf-8'), signBefore, hashlib.sha256)# 将签名进行Base64编码,将编码后的签名进行URL编码sign = quote(base64.b64encode(hsha256.digest()))return {"timestamp": timestamp, "sign": sign}def sendMessage(dingTalkUrl='', dingTalkSecret=None, message='', atMobiles=[], isAtAll=False):print("发送内容:", message, atMobiles, isAtAll)json = {"msgtype": "text","text": {"content": message,},"at": {"atMobiles": atMobiles,"isAtAll": isAtAll}}sign = dingTalkSign(dingTalkSecret)response = requests.post(url=dingTalkUrl, params=sign, json=json)print("响应内容:", response.json())if __name__ == "__main__":local_ip = get_local_ip()file_data = read_yaml("config.yml")if file_data:for entry in file_data:if 'ding-talk' in entry:dingTalkSecret = entry['ding-talk']['secret']access_token = entry['ding-talk']['access-token']if dingTalkSecret is None:print("未配置钉钉机器人密钥")if access_token is None:print("未配置钉钉机器人Token")else:# 自定义机器人推送地址dingTalkUrl = f"https://oapi.dingtalk.com/robot/send?access_token={access_token}"if dingTalkSecret is not None and 'project' in entry:project_name = entry['project']['name']disk_threshold = entry['project']['disk-threshold']memory_threshold = entry['project']['memory-threshold']# 获取磁盘使用情况total_space, used_percentage = get_disk_usage()total_memory, used_memory = get_memory_usage()if (total_space and used_percentage) or (total_memory and used_memory):# 检查磁盘是否快满了(阈值默认为90%)if check_disk_full(used_percentage, threshold_percent=disk_threshold):sendMessage(dingTalkSecret=dingTalkSecret, dingTalkUrl=dingTalkUrl, isAtAll=True,message=f'项目:{project_name}\n内网:{local_ip}\n磁盘:{total_space} / {used_percentage} (总/已用)\n内存:{total_memory} / {used_memory} (总/已用)\n磁盘不足,请及时扩容!')# 检查是否内存不足(默认阈值为90%)if check_memory_insufficient(total_memory, used_memory, threshold_percent=memory_threshold):sendMessage(dingTalkSecret=dingTalkSecret, dingTalkUrl=dingTalkUrl, isAtAll=True,message=f'项目:{project_name}\n内网:{local_ip}\n磁盘:{total_space} / {used_percentage} (总/已用)\n内存:{total_memory} / {used_memory} (总/已用)\n内存不足,请及时扩容!')else:print("未能获取磁盘使用情况。")

定时任务

  • 在线生成CRON:https://tool.lu/crontab
# 查看python3安装位置
which python3
# 添加定时任务
crontab -e
# 定时执行Python脚本,根据自己需求配置执行时间
20 9 * * * /usr/bin/python3 /u01/setup.py
http://www.lryc.cn/news/310235.html

相关文章:

  • Android13 Audio框架
  • kafka消费者接收不到消息
  • Python如何从SQL Server存取数据?
  • 学校机房Dev c++解决中文乱码问题
  • 基于java+springboot景区行李寄存管理系统设计和实现
  • 03-grafana的下拉列表选项制作-grafana的变量
  • Linux网络编程—— IO多路复用
  • C++进阶(二) 多态
  • 【C++】set、multiset与map、multimap的使用
  • Matlab/simulink微电网的PQ控制和下垂控制无缝切换建模仿真
  • 外包干了6个月,技术退步明显
  • 3. springboot中集成部署vue3
  • 问题
  • #WEB前端
  • c语言经典测试题9
  • 3d 舞蹈同步
  • win环境nginx实战配置详解
  • 数字化转型导师坚鹏:如何制定证券公司数字化转型年度培训规划
  • 新王炸:文生视频Sora模型发布,能否引爆AI芯片热潮
  • 代码随想录算法训练营|day48
  • 架构面试题汇总:并发和锁(三)
  • 蓝桥杯(3.2)
  • [数据集][目标检测]鸟类检测数据集VOC+YOLO格式11758张200类别
  • YOLOv9:使用可编程梯度信息学习您想学习的内容
  • uniapp:使用DCloud的uni-push推送消息通知(在线模式)java实现
  • 【简说八股】面试官:你知道什么是AOP么?
  • ASUS华硕天选5笔记本电脑FX607JV原装出厂Win11系统下载
  • Unity(第二十一部)动画的基础了解(感觉不了解其实也行)
  • 写时复制简介
  • 运行Python文件时出现‘utf-8’code can‘t decode byte 如何解决?(如图)