当前位置: 首页 > news >正文

Python 实现多服务器并发启动 SDK-C Master 与 Viewer 的分布式方案

前一篇文章:https://blog.csdn.net/zhang_jiamin/article/details/149199519?spm=1011.2415.3001.5331
介绍了在一台服务器上并发启动master,另一台服务器上并发启动viewer。

这次介绍一下使用python实现分布式在多台服务器上并发启动master和viewer,用以实现更高的并发和负载。

远程 SSH 连接 EC2的python工具

Paramiko

安装

pip install paramiko

使用

import paramikohosts = ['70.xxx.250.xxx', '55.xx.222.xx']for host in hosts:with paramiko.SSHClient() as ssh:ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())ssh.connect(hostname=host, username='ec2-user', key_filename='qa.pem')ssh.exec_command("touch 222.txt")

注:访问EC2的证书放在脚本的同目录。

Fabric

安装

pip install fabric

使用

from fabric import Connectionec2_hosts = ['70.xxx.250.xxx', '55.xx.222.xx']def connect_ec2(host):# 登录EC2conn = Connection(host=host, user='ec2-user', connect_kwargs={"key_filename": "qa.pem"})# 执行shell脚本conn.run("cd kvs-webrtc-sdk/ && bash multi_master_v2.sh")
  • cd kvs-webrtc-sdk/ && bash multi_master_v2.sh 是在 同一个 shell session 中连续执行的
  • 如果写成两条 conn.run(),每条命令会启动一个新的 shell,因此 cd 不会“保留路径”;
  • 可以执行任意复杂命令,比如:
conn.run("cd /data/scripts && chmod +x *.sh && ./deploy.sh")

Paramiko 与 Fabric 对比

工具本质底层依赖
Paramiko一个低层级的 SSH 库原生实现 SSH 协议
Fabric一个高级 SSH 操作库(自动化部署工具)使用了 Paramiko

Fabric 是基于 Paramiko 封装的更易用工具,提供了更简洁的 API 和自动化功能,适合执行多台服务器上的批量操作。

对比项ParamikoFabric
使用难度较低层级,需要手动管理连接、执行命令等较高层级,命令式风格更接近 Shell 脚本
灵活性高,可以精细控制每一步中,适合批量任务但不适合复杂交互
并发执行需要自己用 threading 或 multiprocessingFabric 2.x+ 原生支持并发执行(使用 Group 等)
自动化批处理要自己写逻辑很适合,比如部署、更新、批量执行命令
安装体积大(因为依赖更多,如 invoke)
学习成本稍高(偏底层)较低,语法接近 Shell,适合部署场景
需求类型推荐工具理由
只需连接一台机器,执行简单命令Paramiko更轻量、无额外依赖
批量连接多台服务器FabricAPI 更高级,支持并发、结果收集、任务复用等
自定义复杂 SSH 协议行为ParamikoFabric 无法精细操作底层,如端口转发、SFTP 文件校验等
熟悉 Shell + PythonFabric类似写 Shell 自动化脚本,集成方便

结论

基于以上分析,我选择的是用fabric。

python多线程工具选择

并发(Concurrency)指的是程序在同一个时间段内处理多个任务。

在 Python 中,实现并发的主要方式有:

并发模型核心模块适合场景
线程 Threadingthreading / concurrent.futures.ThreadPoolExecutorIO 密集型任务,如网络请求、文件读写
进程 Multiprocessingmultiprocessing / ProcessPoolExecutorCPU 密集型任务,如图像处理、加密计算
协程 Coroutineasyncio, trio, curio 等高并发、轻量级协程任务,如高并发服务器、爬虫

并发 vs 并行(多核)
并发:多个任务看起来在同时进行,实际是在一个线程中快速切换(适合 IO 密集)
并行:多个任务真正同时在多个 CPU 核心上执行(适合 CPU 密集)
在 Python 中,由于 GIL(全局解释器锁) 的存在,线程不能真正并行跑 Python 字节码,但对于 IO 密集型任务非常有效。

适合使用 ThreadPoolExecutor 的场景

  • 网络请求(如批量请求 API、ping 多个 EC2)
  • 文件操作(如并发读写多个日志)
  • 数据库访问(如查询多个数据源)

如果要跑大量 CPU 密集任务(如图像压缩、加密运算),应改用 ProcessPoolExecutor:

from concurrent.futures import ProcessPoolExecutor

我选择的是多线程并发。

最后代码

from fabric import Connection
from concurrent.futures import ThreadPoolExecutorec2_hosts = ['70.xxx.250.xxx', '55.xx.222.xx']def connect_ec2(host):# 登录EC2conn = Connection(host=host, user='ec2-user', connect_kwargs={"key_filename": "qa.pem"})# 执行shell脚本conn.run("cd kvs-webrtc-sdk/ && bash multi_master_v2.sh")with ThreadPoolExecutor() as pool:futures = [pool.submit(connect_ec2, host) for host in ec2_hosts]for future in futures:future.result()
  • 如果机器很多(10+),可以设置线程池最大并发数max_workers:
with ThreadPoolExecutor(max_workers=10) as pool:

表示最多允许10个线程并发。如果没传 max_workers,Python 会默认用 min(32, os.cpu_count() + 4)。

  • ThreadPoolExecutor()

这是 Python 提供的线程池执行器。它管理一个线程池来运行函数。
等价于手动用 threading.Thread(target=func).start() 启动多个线程,但它更安全、结构更清晰、具备异常处理能力。

  • pool.submit(connect_ec2, host)

submit 表示提交任务到线程池异步执行:

future = pool.submit(func, *args, **kwargs)

它会立即返回一个 Future 对象,任务会在后台某个线程中执行。

所以:

futures = [pool.submit(connect_ec2, host) for host in ec2_hosts]

表示:对 ec2_hosts 中每个主机,启动一个线程去运行 connect_ec2(host)。

  • future.result()

future.result() 表示阻塞等待某个任务执行完,并返回执行结果。

for future in futures:future.result()

1、表示要等所有线程都跑完,确保全部执行完成后再往下执行。
2、这段代码是“并发执行任务,顺序获取结果”,它不是串行运行函数,而是:
(1)任务已经同时提交、并发执行(线程池里跑着)
(2)for future in futures: 只是按顺序取回每个任务的结果
(3)取结果的时候,如果该任务还没执行完,就会阻塞等它完成
(4)实际任务早就并发地在跑了。
3、这个等待是必要的,因为线程执行是异步的,如果不等,有可能主线程就结束了。

总结

元素含义
ThreadPoolExecutor线程池执行器,用于 IO 并发任务
submit(func, arg)提交任务,返回 Future
future.result()阻塞等待任务完成,返回执行结果
max_workers=N控制最多并发线程数量

在pycharm中执行结果如下

/Users/testmanzhang/PycharmProjects/multi_process_launch_viewer.py 
🔄 当前并发数:1
🔄 当前并发数:2
🔄 当前并发数:3
🔄 当前并发数:4
🔄 当前并发数:5
>>> 正在处理 Channel: xxx, SN: xxx
>>> 正在处理 Channel: xxx, SN: xxx
🔄 当前并发数:1
>>> 正在处理 Channel: xxx, SN: xxx
🔄 当前并发数:2
>>> 正在处理 Channel: xxx, SN: xxx
🔄 当前并发数:3
🔄 当前并发数:4
🔄 当前并发数:5
>>> 正在处理 Channel: xxx, SN: xxx
>>> 正在处理 Channel: xxx, SN: xxx
>>> 正在处理 Channel: xxx, SN: xxx
>>> 正在处理 Channel: xxx, SN: xxx
>>> 正在处理 Channel: xxx, SN: xxx
>>> 正在处理 Channel: xxx, SN: xxx
...

这些打印实际上是服务器上面shell脚本的执行打印,其实是通过fabric同步到了当前代码执行的IDE中。

http://www.lryc.cn/news/602554.html

相关文章:

  • 科技赋能成长 脑力启迪未来
  • windows内核研究(异常-CPU异常记录)
  • 计算机视觉---Halcon概览
  • 暑期自学嵌入式——Day10(C语言阶段)
  • 生成器和迭代器的区别
  • 【65 Pandas+Pyecharts | 山东省2025年高考志愿投档数据分析可视化】
  • MCP架构:模型上下文协议的范式革命与工程实践
  • JSBridge原理与实现全解析
  • 嵌入式单片机中位带操作控制与实现
  • flutter使用firebase集成谷歌,苹果登录
  • C++20实战FlamingoIM开发
  • 和豆包玩的AI文字冒险游戏(可以当小说看)
  • 大模型推理框架基础概述
  • 4.应用层自定义协议与序列化
  • 【OS】真题 2015
  • k8s中Nvidia节点驱动的配置问题
  • Item18:让接口容易被正确使用,不易被误用
  • 设计模式(十五)行为型:命令模式详解
  • 计算机毕业设计java在线二手系统的设计与实现 基于Java的在线二手交易平台开发 Java技术驱动的二手物品管理系统
  • 低代码可视化AR远程协助、巡检、装配、质检新平台-元境智搭平台
  • MySQL高级配置与优化实战指南
  • 网站劫持是什么?如何防御?一篇简单科普
  • windows clion远程连接ubuntu运行调试nginx-1.22.1版本
  • MySQL有哪些“饮鸩止渴”提高性能的方法?
  • Linux应用程序架构与软件包管理
  • 在Windows下读写Linux EXT文件系统文件
  • VMWARE -ESXI-ntp时间同步无法启动异常处理
  • 用 Python 获取电脑电池电量的各种案例
  • ubuntu资源共享samba 安装与配置 mac/windows共享ubuntu文件资源
  • 暴雨服务器更懂人工智能+