当前位置: 首页 > news >正文

Python 自动化运维与DevOps实践

https://www.python.org/static/community_logos/python-logo-master-v3-TM.png

基础设施即代码(IaC)

使用Fabric执行远程命令

python

复制

下载

from fabric import Connectiondef deploy_app():# 连接到远程服务器with Connection('web-server.example.com', user='deploy', connect_kwargs={"key_filename": "/path/to/key.pem"}) as c:# 更新代码c.run('cd /var/www/myapp && git pull origin master')# 安装依赖c.run('cd /var/www/myapp && pip install -r requirements.txt')# 重启服务c.sudo('systemctl restart myapp', pty=True)# 检查状态result = c.run('systemctl status myapp', hide=True)print(f"服务状态:\n{result.stdout}")if __name__ == '__main__':deploy_app()

Ansible Python API

python

复制

下载

from ansible.module_utils.basic import AnsibleModule
import subprocessdef run_ansible_playbook(playbook_path):try:result = subprocess.run(['ansible-playbook', playbook_path],check=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE,text=True)print("Playbook执行成功:")print(result.stdout)except subprocess.CalledProcessError as e:print("Playbook执行失败:")print(e.stderr)# 自定义Ansible模块示例
def configure_nginx(module):# 获取参数server_name = module.params['server_name']root_path = module.params['root_path']# 生成Nginx配置config = f"""server {{listen 80;server_name {server_name};root {root_path};location / {{try_files $uri $uri/ =404;}}}}"""# 写入配置文件try:with open(f'/etc/nginx/sites-available/{server_name}', 'w') as f:f.write(config)# 创建符号链接subprocess.run(['ln', '-sf', f'/etc/nginx/sites-available/{server_name}', f'/etc/nginx/sites-enabled/{server_name}'], check=True)# 测试并重载Nginxsubprocess.run(['nginx', '-t'], check=True)subprocess.run(['systemctl', 'reload', 'nginx'], check=True)module.exit_json(changed=True, msg="Nginx配置更新成功")except Exception as e:module.fail_json(msg=f"配置失败: {str(e)}")if __name__ == '__main__':run_ansible_playbook('deploy.yml')

监控与告警系统

Prometheus自定义指标

python

复制

下载

from prometheus_client import start_http_server, Gauge
import psutil
import time# 创建指标
CPU_USAGE = Gauge('system_cpu_percent', 'CPU使用百分比')
MEMORY_USAGE = Gauge('system_memory_percent', '内存使用百分比')
DISK_USAGE = Gauge('system_disk_percent', '磁盘使用百分比')def collect_metrics():while True:# 收集CPU使用率CPU_USAGE.set(psutil.cpu_percent())# 收集内存使用率MEMORY_USAGE.set(psutil.virtual_memory().percent)# 收集磁盘使用率DISK_USAGE.set(psutil.disk_usage('/').percent)time.sleep(5)if __name__ == '__main__':# 启动指标服务器start_http_server(8000)print("Prometheus指标服务器已启动,端口8000")collect_metrics()

告警规则与通知

python

复制

下载

import smtplib
from email.mime.text import MIMEText
from datetime import datetimeclass AlertManager:def __init__(self, thresholds):self.thresholds = thresholdsself.alert_history = {}def check_metrics(self, metrics):alerts = []current_time = datetime.now()# CPU检查if metrics['cpu'] > self.thresholds['cpu']:alert_key = 'high_cpu'if self._should_alert(alert_key, current_time):alerts.append(f"CPU使用率过高: {metrics['cpu']}%")# 内存检查if metrics['memory'] > self.thresholds['memory']:alert_key = 'high_memory'if self._should_alert(alert_key, current_time):alerts.append(f"内存使用率过高: {metrics['memory']}%")# 磁盘检查if metrics['disk'] > self.thresholds['disk']:alert_key = 'high_disk'if self._should_alert(alert_key, current_time):alerts.append(f"磁盘使用率过高: {metrics['disk']}%")return alertsdef _should_alert(self, alert_key, current_time):# 防止告警风暴,同一问题5分钟内不重复告警last_alert = self.alert_history.get(alert_key)if last_alert and (current_time - last_alert).seconds < 300:return Falseself.alert_history[alert_key] = current_timereturn Truedef send_email_alert(self, to_addr, subject, body):msg = MIMEText(body)msg['Subject'] = subjectmsg['From'] = 'alert@example.com'msg['To'] = to_addrtry:with smtplib.SMTP('smtp.example.com', 587) as server:server.starttls()server.login('user', 'password')server.send_message(msg)print("告警邮件发送成功")except Exception as e:print(f"发送告警邮件失败: {str(e)}")# 使用示例
thresholds = {'cpu': 90, 'memory': 85, 'disk': 90}
alert_manager = AlertManager(thresholds)metrics = {'cpu': 95, 'memory': 80, 'disk': 92}
alerts = alert_manager.check_metrics(metrics)if alerts:alert_manager.send_email_alert('admin@example.com','系统告警通知','\n'.join(alerts))

日志管理与分析

ELK日志处理管道

python

复制

下载

import logging
from pythonjsonlogger import jsonlogger
from logging.handlers import RotatingFileHandler
import logstashdef setup_logging():# 创建loggerlogger = logging.getLogger('app')logger.setLevel(logging.INFO)# JSON格式formatter = jsonlogger.JsonFormatter('%(asctime)s %(levelname)s %(name)s %(message)s')# 文件处理器file_handler = RotatingFileHandler('/var/log/app/app.log',maxBytes=10*1024*1024,  # 10MBbackupCount=5)file_handler.setFormatter(formatter)logger.addHandler(file_handler)# Logstash处理器logstash_handler = logstash.LogstashHandler('logstash.example.com',5044,version=1)logger.addHandler(logstash_handler)# 控制台处理器console_handler = logging.StreamHandler()console_handler.setFormatter(formatter)logger.addHandler(console_handler)return logger# 使用示例
logger = setup_logging()
logger.info("应用启动", extra={'user': 'admin', 'module': 'startup'})
try:1 / 0
except Exception as e:logger.error("发生错误", exc_info=True, extra={'context': 'division'})

日志分析脚本

python

复制

下载

import pandas as pd
import re
from collections import Counterdef analyze_logs(log_file):# 读取日志文件logs = []with open(log_file, 'r') as f:for line in f:try:log = json.loads(line)logs.append(log)except json.JSONDecodeError:continue# 转换为DataFramedf = pd.DataFrame(logs)# 分析错误级别print("\n错误级别分布:")print(df['levelname'].value_counts())# 提取并统计错误消息error_messages = df[df['levelname'] == 'ERROR']['message']print("\n最常见错误消息:")print(error_messages.value_counts().head(5))# 提取HTTP状态码df['status_code'] = df['message'].str.extract(r'status code (\d{3})')if 'status_code' in df.columns:print("\nHTTP状态码分布:")print(df['status_code'].value_counts())# 分析时间模式df['hour'] = pd.to_datetime(df['asctime']).dt.hourprint("\n按小时分布的日志量:")print(df['hour'].value_counts().sort_index())if __name__ == '__main__':analyze_logs('/var/log/app/app.log')

配置管理

使用Python管理配置文件

python

复制

下载

import configparser
import yaml
import json
from typing import Dict, Anyclass ConfigManager:def __init__(self):self.configs = {}def load_ini(self, filepath: str) -> Dict[str, Any]:"""加载INI配置文件"""config = configparser.ConfigParser()config.read(filepath)self.configs['ini'] = {s: dict(config.items(s)) for s in config.sections()}return self.configs['ini']def load_yaml(self, filepath: str) -> Dict[str, Any]:"""加载YAML配置文件"""with open(filepath, 'r') as f:self.configs['yaml'] = yaml.safe_load(f)return self.configs['yaml']def load_json(self, filepath: str) -> Dict[str, Any]:"""加载JSON配置文件"""with open(filepath, 'r') as f:self.configs['json'] = json.load(f)return self.configs['json']def get_value(self, config_type: str, *keys) -> Any:"""获取嵌套配置值"""current = self.configs.get(config_type, {})for key in keys:if isinstance(current, dict) and key in current:current = current[key]else:return Nonereturn currentdef update_config(self, config_type: str, updates: Dict[str, Any]):"""更新配置"""if config_type in self.configs:self.configs[config_type].update(updates)def save_config(self, config_type: str, filepath: str):"""保存配置到文件"""if config_type not in self.configs:raise ValueError(f"未加载的配置类型: {config_type}")if config_type == 'ini':config = configparser.ConfigParser()for section, options in self.configs['ini'].items():config[section] = optionswith open(filepath, 'w') as f:config.write(f)elif config_type == 'yaml':with open(filepath, 'w') as f:yaml.dump(self.configs['yaml'], f)elif config_type == 'json':with open(filepath, 'w') as f:json.dump(self.configs['json'], f, indent=2)# 使用示例
config_manager = ConfigManager()
config_manager.load_yaml('config.yaml')
db_host = config_manager.get_value('yaml', 'database', 'host')
print(f"数据库主机: {db_host}")# 更新配置
config_manager.update_config('yaml', {'database': {'host': 'new.db.example.com'}})
config_manager.save_config('yaml', 'config_updated.yaml')

容器化与编排

Docker SDK for Python

python

复制

下载

import docker
from docker.errors import DockerExceptionclass DockerManager:def __init__(self):try:self.client = docker.from_env()print("Docker连接成功")except DockerException as e:print(f"连接Docker失败: {str(e)}")raisedef list_containers(self, all=False):"""列出容器"""return self.client.containers.list(all=all)def run_container(self, image, command=None, detach=True, **kwargs):"""运行容器"""return self.client.containers.run(image,command=command,detach=detach,**kwargs)def build_image(self, path, tag, dockerfile='Dockerfile'):"""构建镜像"""return self.client.images.build(path=path,tag=tag,dockerfile=dockerfile)def cleanup_containers(self):"""清理停止的容器"""stopped_containers = self.list_containers(all=True, filters={'status': 'exited'})for container in stopped_containers:print(f"删除容器: {container.id}")container.remove()def cleanup_images(self, dangling=True):"""清理未使用的镜像"""filters = {'dangling': True} if dangling else Noneunused_images = self.client.images.list(filters=filters)for image in unused_images:print(f"删除镜像: {image.tags}")self.client.images.remove(image.id)# 使用示例
docker_manager = DockerManager()
print("运行中的容器:")
for container in docker_manager.list_containers():print(f" - {container.name}: {container.status}")# 运行新容器
nginx = docker_manager.run_container('nginx:latest',ports={'80/tcp': 8080},name='web-server'
)
print(f"启动容器: {nginx.name}")

Kubernetes Python客户端

python

复制

下载

from kubernetes import client, configclass KubernetesManager:def __init__(self, in_cluster=False):if in_cluster:config.load_incluster_config()else:config.load_kube_config()self.core_v1 = client.CoreV1Api()self.apps_v1 = client.AppsV1Api()def list_pods(self, namespace='default'):"""列出Pod"""return self.core_v1.list_namespaced_pod(namespace)def create_deployment(self, name, image, replicas=1, namespace='default'):"""创建Deployment"""deployment = client.V1Deployment(metadata=client.V1ObjectMeta(name=name),spec=client.V1DeploymentSpec(replicas=replicas,selector={'matchLabels': {'app': name}},template=client.V1PodTemplateSpec(metadata=client.V1ObjectMeta(labels={'app': name}),spec=client.V1PodSpec(containers=[client.V1Container(name=name,image=image)]))))return self.apps_v1.create_namespaced_deployment(namespace=namespace,body=deployment)def create_service(self, name, port, target_port, namespace='default', service_type='LoadBalancer'):"""创建Service"""service = client.V1Service(metadata=client.V1ObjectMeta(name=name),spec=client.V1ServiceSpec(selector={'app': name},ports=[client.V1ServicePort(port=port,target_port=target_port)],type=service_type))return self.core_v1.create_namespaced_service(namespace=namespace,body=service)# 使用示例
k8s_manager = KubernetesManager()print("集群中的Pod:")
pods = k8s_manager.list_pods()
for pod in pods.items:print(f" - {pod.metadata.name}: {pod.status.phase}")# 部署应用
deployment = k8s_manager.create_deployment('myapp','myapp:1.0',replicas=3
)
service = k8s_manager.create_service('myapp-service',port=80,target_port=8080
)
print(f"已部署: {deployment.metadata.name}")
print(f"已创建服务: {service.metadata.name}")

CI/CD流水线

GitLab CI Python脚本

python

复制

下载

import gitlab
import requests
import timeclass GitLabCICD:def __init__(self, gitlab_url, private_token, project_id):self.gl = gitlab.Gitlab(gitlab_url, private_token=private_token)self.project = self.gl.projects.get(project_id)def trigger_pipeline(self, branch='master', variables=None):"""触发CI/CD流水线"""pipeline = self.project.pipelines.create({'ref': branch,'variables': variables or {}})print(f"已触发流水线: {pipeline.id}")return pipelinedef wait_for_pipeline(self, pipeline, timeout=1800, interval=30):"""等待流水线完成"""start_time = time.time()while time.time() - start_time < timeout:pipeline.refresh()if pipeline.status in ['success', 'failed', 'canceled', 'skipped']:print(f"流水线状态: {pipeline.status}")return pipelineprint(f"等待中... 当前状态: {pipeline.status}")time.sleep(interval)raise TimeoutError("等待流水线超时")def get_job_logs(self, pipeline, job_name):"""获取作业日志"""for job in pipeline.jobs.list():if job.name == job_name:return job.trace().decode('utf-8')return Nonedef deploy_to_environment(self, environment, version):"""部署到指定环境"""# 触发部署流水线pipeline = self.trigger_pipeline(branch='master',variables={'DEPLOY_ENV': environment,'APP_VERSION': version})# 等待部署完成pipeline = self.wait_for_pipeline(pipeline)if pipeline.status == 'success':print(f"成功部署 {version} 到 {environment}")return Trueelse:logs = self.get_job_logs(pipeline, 'deploy')print(f"部署失败. 作业日志:\n{logs}")return False# 使用示例
ci_cd = GitLabCICD(gitlab_url='https://gitlab.example.com',private_token='your-private-token',project_id=12345
)# 部署新版本
ci_cd.deploy_to_environment('production', '1.2.0')

GitHub Actions Python SDK

python

复制

下载

import os
from github import Github
from base64 import b64encodeclass GitHubActionsManager:def __init__(self, token, repo_name):self.gh = Github(token)self.repo = self.gh.get_repo(repo_name)def create_workflow(self, workflow_name, workflow_content):"""创建或更新工作流"""workflow_path = f".github/workflows/{workflow_name}.yml"try:# 检查工作流是否存在contents = self.repo.get_contents(workflow_path)# 更新现有工作流self.repo.update_file(workflow_path,f"Update {workflow_name}",workflow_content,contents.sha)print(f"已更新工作流: {workflow_name}")except Exception as e:# 创建新工作流self.repo.create_file(workflow_path,f"Create {workflow_name}",workflow_content)print(f"已创建工作流: {workflow_name}")def trigger_workflow_dispatch(self, workflow_name, ref='master', inputs=None):"""手动触发工作流"""workflow = self.repo.get_workflow(f"{workflow_name}.yml")workflow.create_dispatch(ref, inputs=inputs or {})print(f"已触发工作流: {workflow_name}")def get_workflow_runs(self, workflow_name, branch=None):"""获取工作流运行记录"""workflow = self.repo.get_workflow(f"{workflow_name}.yml")runs = workflow.get_runs(branch=branch)return list(runs)def download_workflow_logs(self, run_id, output_dir='logs'):"""下载工作流日志"""run = self.repo.get_workflow_run(run_id)jobs = run.jobs()if not os.path.exists(output_dir):os.makedirs(output_dir)for job in jobs:log = job.logs()log_file = os.path.join(output_dir, f"{run_id}_{job.name}.log")with open(log_file, 'w') as f:f.write(log)print(f"已保存日志: {log_file}")# 使用示例
actions_mgr = GitHubActionsManager(token=os.getenv('GITHUB_TOKEN'),repo_name='your-username/your-repo'
)# 定义CI工作流
ci_workflow = """
name: Python CIon: [push, pull_request]jobs:build:runs-on: ubuntu-lateststeps:- uses: actions/checkout@v2- name: Set up Pythonuses: actions/setup-python@v2with:python-version: '3.9'- name: Install dependenciesrun: |python -m pip install --upgrade pippip install -r requirements.txt- name: Run testsrun: |pytest
"""# 创建/更新工作流
actions_mgr.create_workflow("python-ci", ci_workflow)# 手动触发工作流
actions_mgr.trigger_workflow_dispatch("python-ci",inputs={"environment": "staging"}
)

安全自动化

漏洞扫描集成

python

复制

下载

import subprocess
import json
from datetime import datetimeclass SecurityScanner:def __init__(self, output_dir='reports'):self.output_dir = output_dirif not os.path.exists(self.output_dir):os.makedirs(self.output_dir)def run_dependency_scan(self, project_path):"""运行依赖项漏洞扫描"""report_file = os.path.join(self.output_dir,f"dependency_scan_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json")try:result = subprocess.run(['safety', 'check', '--json', '--output', report_file],cwd=project_path,capture_output=True,text=True)if result.returncode == 0:print("未发现漏洞依赖")return Trueelse:with open(report_file, 'r') as f:vulnerabilities = json.load(f)print(f"发现 {len(vulnerabilities)} 个漏洞:")for vuln in vulnerabilities:print(f" - {vuln['package_name']} {vuln['analyzed_version']}: {vuln['advisory']}")return Falseexcept Exception as e:print(f"依赖扫描失败: {str(e)}")return Falsedef run_code_scan(self, project_path):"""运行静态代码分析"""report_file = os.path.join(self.output_dir,f"code_scan_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json")try:result = subprocess.run(['bandit', '-r', '.', '-f', 'json', '-o', report_file],cwd=project_path,capture_output=True,text=True)with open(report_file, 'r') as f:report = json.load(f)if report['metrics']['_totals']['issues'] == 0:print("未发现安全问题")return Trueelse:print(f"发现 {report['metrics']['_totals']['issues']} 个安全问题:")for issue in report['results']:print(f" - {issue['issue_text']} (严重性: {issue['issue_severity']})")return Falseexcept Exception as e:print(f"代码扫描失败: {str(e)}")return Falsedef generate_report(self):"""生成安全报告"""reports = []for filename in os.listdir(self.output_dir):if filename.endswith('.json'):with open(os.path.join(self.output_dir, filename), 'r') as f:reports.append({'filename': filename,'content': json.load(f)})html_report = """<html><head><title>安全扫描报告</title></head><body><h1>安全扫描报告</h1><p>生成时间: {}</p>""".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))for report in reports:html_report += f"<h2>{report['filename']}</h2>"html_report += "<pre>" + json.dumps(report['content'], indent=2) + "</pre>"html_report += "</body></html>"report_path = os.path.join(self.output_dir, 'security_report.html')with open(report_path, 'w') as f:f.write(html_report)print(f"报告已生成: {report_path}")return report_path# 使用示例
scanner = SecurityScanner()
project_path = '/path/to/your/project'# 运行扫描
dep_scan_ok = scanner.run_dependency_scan(project_path)
code_scan_ok = scanner.run_code_scan(project_path)# 生成报告
if not dep_scan_ok or not code_scan_ok:report_path = scanner.generate_report()print("发现安全问题,请查看报告")

自动化安全加固

python

复制

下载

import os
import platform
import subprocessclass SystemHardener:def __init__(self):self.system = platform.system().lower()self.hardening_actions = []def check_permissions(self, path, recommended_mode):"""检查文件权限"""current_mode = oct(os.stat(path).st_mode & 0o777)if current_mode != recommended_mode:self.hardening_actions.append(f"chmod {recommended_mode} {path}")return Falsereturn Truedef check_ssh_config(self):"""检查SSH配置"""sshd_config = '/etc/ssh/sshd_config'if not os.path.exists(sshd_config):return Truewith open(sshd_config, 'r') as f:content = f.read()checks = {'PermitRootLogin': 'no','PasswordAuthentication': 'no','X11Forwarding': 'no'}for param, expected_value in checks.items():if f"{param} {expected_value}" not in content:self.hardening_actions.append(f"echo '{param} {expected_value}' >> {sshd_config}")def apply_hardening(self, dry_run=False):"""应用安全加固"""if not self.hardening_actions:print("系统已符合安全标准")returnprint("需要执行以下加固操作:")for action in self.hardening_actions:print(f" - {action}")if dry_run:returnconfirm = input("确认执行这些操作吗? (y/n): ")if confirm.lower() == 'y':for action in self.hardening_actions:try:if action.startswith('echo'):# 处理追加配置的情况cmd, redirection = action.split('>>')param = cmd.strip().split('echo ')[1]with open(redirection.strip(), 'a') as f:f.write(f"\n{param}\n")else:subprocess.run(action, shell=True, check=True)print(f"执行成功: {action}")except subprocess.CalledProcessError as e:print(f"执行失败: {action} - {str(e)}")print("安全加固完成")else:print("操作已取消")# 使用示例
hardener = SystemHardener()# 检查关键文件权限
hardener.check_permissions('/etc/passwd', '0o644')
hardener.check_permissions('/etc/shadow', '0o640')# 检查SSH配置
hardener.check_ssh_config()# 应用加固(测试运行)
hardener.apply_hardening(dry_run=True)# 实际应用加固
# hardener.apply_hardening()

结语与学习路径

https://www.python.org/static/community_logos/python-powered-h-140x182.png

通过这十篇系列教程,你已经掌握了:

  1. 基础设施即代码(IaC)实践

  2. 监控告警系统构建

  3. 日志管理与分析技术

  4. 配置管理最佳实践

  5. 容器化与编排技术

  6. CI/CD流水线实现

  7. 安全自动化与加固

进阶学习方向

  1. 云原生技术栈

    • 深入Kubernetes Operator开发

    • 服务网格(Service Mesh)实现

    • 无服务器架构(Serverless)

  2. 性能优化

    • 分布式系统性能调优

    • 大规模集群管理

    • 高可用架构设计

  3. 安全专业领域

    • 渗透测试与红队技术

    • 零信任架构实现

    • 合规性自动化检查

  4. 认证体系

    • AWS/Azure/GCP云认证

    • Certified Kubernetes Administrator

    • HashiCorp认证工程师

Python在自动化运维和DevOps领域的应用日益广泛,持续学习和实践将助你成为高效能的技术专家!

http://www.lryc.cn/news/572137.html

相关文章:

  • JVM(7)——详解标记-整理算法
  • 基于YOLOv10算法的交通信号灯检测与识别
  • RTSP播放器低延迟实践:一次对毫秒级响应的技术探索
  • 从零开始的云计算生活——第二十天,脚踏实地,SSH与Rsync服务
  • ThinkPHP结合使用PHPConsole向Chrome 控制台输出SQL
  • 计算机网络 网络层:数据平面(一)
  • ​ CATIA V5与3DEXPERIENCE协同设计:引领无人机行业新纪元
  • 【无人机实时拼图框架(正射影像)论文翻译】OpenREALM: Real-time Mapping for Unmanned Aerial Vehicles
  • 14.8 AI写作核心技术解析:四阶段分层提示工程如何实现从大纲到风格的全流程优化
  • RPC - Response模块
  • 2025年通信安全员考试题库及答案
  • JVM(6)——详解标记-清除算法
  • 安卓vscodeAI开发实例
  • 安卓JetPack篇——Livadata
  • 无人机电机模块技术分析
  • Vue-11-前端框架Vue之应用基础父组件传值到子组件props的使用
  • 破局基建困局:国有平台公司数字化转型的生态重构
  • Spring Boot 集成 Elasticsearch(含 ElasticsearchRestTemplate 示例)
  • 华为网路设备学习-25(路由器OSPF - 特性专题 二)
  • CSS语法中的选择器与属性详解
  • day42-硬件学习之温度传感器及(ARM体系架构)
  • AR/VR显示为何视场受限?OAS对标波导案例来解决
  • 【跨界新视野】信号处理遇上VR/AR:下一代沉浸体验的核心技术与您的发表蓝海
  • C++实现异步(重叠)管道通信
  • 【MySQL基础】MySQL内置函数全面解析:提升你的数据库操作效率
  • ③-1实现 FastAdmin 默认开启通用搜索功能的方法
  • 教学的新革命!大模型生成讲解,Manim 打造动画视频
  • 【MySQL】SQL基础
  • 50-Oracle awr报告生成-实操
  • 关于AB PLC的ethernet/IP 通信 c++搭建