当前位置: 首页 > news >正文

Python学习(1):使用Python的Dask库实现并行计算

目录

一、Dask介绍

二、使用说明

安装

三、测试

1、单个文件中实现功能

2、运行多个可执行文件


最近在写并行计算相关部分,用到了python的Dask库。

Dask官网:Dask | Scale the Python tools you love

一、Dask介绍

Dask是一个灵活的并行和分布式计算库,旨在处理大规模数据集。它提供了类似于Pandas 和 NumPy 的数据结构,但能够有效处理比内存更大的数据集。通过使用Dask,可以在单台机器或分布式集群中运行,更方便处理大规模数据。

Dask是一个用于Python的并行计算模块,从单机多核扩展到拥有数千台机器的数据中心。它既由低级任务API,也有更高级面向数据的API。低级任务API支持Dask与多种Python库的集成,公共API为围绕Dask发展的各种工具的生态系统提供了基础。

Dask相较于Spark这些大数据处理框架,更轻量级。Dask更侧重与其他框架,如:Numpy、Pandas,Scikit-learning相结合,从而使其能更加方便进行分布式并行计算。

Dask存在三种最基本的数据结构,分别是:Arrays、Dataframes以及Bags

二、使用说明

安装

pip install dask
python -m pip install "dask[array]"
python -m pip install "dask[distributed]"
python -m pip install "dask[dataframe]"

先测试是否已经安装了模块,命令行进入到python3编辑器:

from dask.distributed import Client, progress

没有报缺少模块错误,则说明是可以正常执行的。

三、测试

1、单个文件中实现功能

下述的主要数据处理在定义计算任务函数calculate_value(num)中,即将计算任务函数处理32次。

from dask.distributed import Client, progress
import time# 定义计算任务的函数
def calculate_value(num):num_float = float(num) * 0.33num_double = float(num) * 0.33  return num_float, num_double# 设置Dask客户端
def setup_client():from dask.distributed import Client, LocalClustercluster = LocalCluster()client = Client(cluster)scheduler_info = client.scheduler_info()ncores = sum(worker['nthreads'] for worker in scheduler_info['workers'].values())print(f"Connected to Dask cluster with {ncores} cores")return client# 提交任务并收集结果
def submit_tasks(client, num, num_tasks=32):# 创建任务列表tasks = [client.submit(calculate_value, num) for _ in range(num_tasks)]# 等待所有任务完成,并显示进度progress(tasks)# 收集结果results = [task.result() for task in tasks]return results# 主函数
def main():num = 558558571  # 这是您要处理的数字client = setup_client()  # 设置Dask客户端# 提交32个任务results = submit_tasks(client, num)# 打印结果for i, (num_float, num_double) in enumerate(results):print(f"Task {i+1} - num_float: {num_float}, num_double: {num_double}")# 关闭客户端连接client.close()if __name__ == "__main__":main()

运行上述的python程序:

python3 my_dask_script.py

执行结果如下:

此时表示运行了32个task。

在运行的时候如果提示:

表明 dask-scheduler 无法启动,原因是端口 8787 已经被占用了。

解决方法:

1、查找并终止占用端口 8787 的进程

(1)先安装lsof:

apt install lsof

(2)查看占用端口进程:

lsof -i :8787

(3)通过进程的 PID 使用 kill 命令终止该进程:

kill -9 PID

2、修改 dask-scheduler 使用的端口

dask-scheduler --port 8888

再次重新启动查看 dask-scheduler 使用的端口:

dask-scheduler

2、运行多个可执行文件

我在同目录中创建了一个test.cc文件,为简单的打印数据,内容如下:

#include <iostream>
#include <iomanip>int main() {int num = 558558571;float num_float = static_cast<float>(num) * 0.33;double num_double = static_cast<double>(num) * 0.33;std::cout << "num value: " << num << std::endl;std::cout << std::fixed << std::setprecision(2);std::cout << "num_float value: " << num_float << std::endl;std::cout << "num_double value: " << num_double << std::endl;return 0;
}

此时将上述的test.cc编译:

g++ -o main test.cc

然后新建一个my_dask_script.py文件,内容如下:

from dask.distributed import Client, LocalCluster
import os# 定义执行外部程序的函数
def run_external_program():cmd = './main'  # 您的外部程序命令os.system(cmd)  # 使用os.system来执行命令# 设置Dask客户端
def setup_client():from dask.distributed import Client, LocalClustercluster = LocalCluster()client = Client(cluster)scheduler_info = client.scheduler_info()ncores = sum(worker['nthreads'] for worker in scheduler_info['workers'].values())print(f"Connected to Dask cluster with {ncores} cores")return client# 提交任务到Dask集群
def submit_tasks(client, num_tasks=32):futures = [client.submit(run_external_program) for _ in range(num_tasks)]return futures# 主函数
def main():client = setup_client()  # 设置Dask客户端futures = submit_tasks(client)  # 提交任务# 等待所有任务完成client.gather(futures)# 关闭客户端连接client.close()if __name__ == "__main__":main()

运行结果:

此时表示上述的可执行文件main已运行了32份。

http://www.lryc.cn/news/418085.html

相关文章:

  • 数据结构 - 哈希表
  • 电商选品这几点没做好,等于放弃80%的流量!
  • 【教程】最新可用!Docker国内镜像源列表
  • 使用RabbitMQ在Spring Boot入门实现简单的消息的发送与接收
  • 基于物联网的水质监测系统设计与实现:React前端、Node.js后端与TCP/IP协议的云平台集成(代码示例)
  • Vcpkg安装指定版本包或自定义安装包
  • 【C++深度探索】红黑树实现Set与Map的封装
  • 终于有人把客户成功讲明白了
  • [新械专栏] 肾动脉射频消融仪及一次性使用网状肾动脉射频消融导管获批上市
  • leetcode-119-杨辉三角II
  • 【第八节】python正则表达式
  • 三大浏览器Google Chrome、Edge、Firefox内存占用对比
  • 【wiki知识库】08.添加用户登录功能--后端SpringBoot部分
  • vue中nextTick的作用
  • 计算机网络面试-核心概念-问题理解
  • go语言创建协程
  • RabbitMQ之基于注解声明队列交换机:使用@RabbitListener实现消息监听
  • 【grafana 】mac端grafana配置的文件 grafana.ini 及login
  • 程序员如何在人工智能时代保持核心竞争力
  • 回溯排列+棋盘问题篇--代码随想录算法训练营第二十三天| 46.全排列,47.全排列 II,51. N皇后,37. 解数独
  • ESXI加入VMware现有集群提示常规性错误
  • 数字噪音计(声级计)【AR814数字噪音计】
  • 【Vue3】图片未加载成功前占位
  • AbstractQueuedSynchronizer之AQS
  • <数据集>起重机识别数据集<目标检测>
  • 04--Docker
  • MiniCPM-V: A GPT-4V Level MLLM on Your Phone 手机上的 GPT-4V 级多模态大模型
  • Unity初识
  • 【游戏引擎之路】登神长阶(九)——《3D游戏编程大师技巧》:我想成为游戏之神!
  • Linux:线程同步之信号量