当前位置: 首页 > news >正文

Python: 结合多进程和 Asyncio 以提高性能

动动发财的小手,点个赞吧!

简介

多亏了 GIL,使用多个线程来执行 CPU 密集型任务从来都不是一种选择。随着多核 CPU 的普及,Python 提供了一种多处理解决方案来执行 CPU 密集型任务。但是直到现在,直接使用多进程相关的API还是存在一些问题。

本文[1]开始之前,我们还有一小段代码来帮助演示:

import time
from multiprocessing import Process


def sum_to_num(final_num: int) -> int:
    start = time.monotonic()

    result = 0
    for i in range(0, final_num+11):
        result += i

    print(f"The method with {final_num} completed in {time.monotonic() - start:.2f} second(s).")
    return result

该方法接受一个参数并从 0 开始累加到该参数。打印方法执行时间并返回结果。

多进程存在的问题

def main():
    # We initialize the two processes with two parameters, from largest to smallest
    process_a = Process(target=sum_to_num, args=(200_000_000,))
    process_b = Process(target=sum_to_num, args=(50_000_000,))

    # And then let them start executing
    process_a.start()
    process_b.start()

    # Note that the join method is blocking and gets results sequentially
    start_a = time.monotonic()
    process_a.join()
    print(f"Process_a completed in {time.monotonic() - start_a:.2f} seconds")

    # Because when we wait process_a for join. The process_b has joined already.
    # so the time counter is 0 seconds.
    start_b = time.monotonic()
    process_b.join()
    print(f"Process_b completed in {time.monotonic() - start_b:.2f} seconds")

如代码所示,我们直接创建并启动多个进程,调用每个进程的start和join方法。但是,这里存在一些问题:

  1. join 方法不能返回任务执行的结果。
  2. join 方法阻塞主进程并按顺序执行它。

即使后面的任务比前面的任务执行得更快,如下图所示:

alt
alt

使用池的问题

如果我们使用multiprocessing.Pool,也会存在一些问题:

def main():
    with Pool() as pool:
        result_a = pool.apply(sum_to_num, args=(200_000_000,))
        result_b = pool.apply(sum_to_num, args=(50_000_000,))

        print(f"sum_to_num with 200_000_000 got a result of {result_a}.")
        print(f"sum_to_num with 50_000_000 got a result of {result_b}.")

如代码所示,Pool 的 apply 方法是同步的,这意味着您必须等待之前的 apply 任务完成才能开始执行下一个 apply 任务。

alt

当然,我们可以使用 apply_async 方法异步创建任务。但是同样,您需要使用 get 方法来阻塞地获取结果。它让我们回到 join 方法的问题:

def main():
    with Pool() as pool:
        result_a = pool.apply_async(sum_to_num, args=(200_000_000,))
        result_b = pool.apply_async(sum_to_num, args=(50_000_000,))

        print(f"sum_to_num with 200_000_000 got a result of {result_a.get()}.")
        print(f"sum_to_num with 50_000_000 got a result of {result_b.get()}.")
alt

直接使用ProcessPoolExecutor的问题

那么,如果我们使用 concurrent.futures.ProcesssPoolExecutor 来执行我们的 CPU 绑定任务呢?

def main():
    with ProcessPoolExecutor() as executor:
        numbers = [200_000_000, 50_000_000]
        for result in executor.map(sum_to_num, numbers):
            print(f"sum_to_num got a result which is {result}.")

如代码所示,一切看起来都很棒,并且就像 asyncio.as_completed 一样被调用。但是看看结果;它们仍按启动顺序获取。这与 asyncio.as_completed 完全不同,后者按照执行顺序获取结果:

alt
alt

使用 asyncio 的 run_in_executor 修复

幸运的是,我们可以使用 asyncio 来处理 IO-bound 任务,它的 run_in_executor 方法可以像 asyncio 一样调用多进程任务。不仅统一了并发和并行的API,还解决了我们上面遇到的各种问题:

async def main():
    loop = asyncio.get_running_loop()
    tasks = []

    with ProcessPoolExecutor() as executor:
        for number in [200_000_000, 50_000_000]:
            tasks.append(loop.run_in_executor(executor, sum_to_num, number))
        
        # Or we can just use the method asyncio.gather(*tasks)
        for done in asyncio.as_completed(tasks):
            result = await done
            print(f"sum_to_num got a result which is {result}")
alt

由于上一篇的示例代码都是模拟我们应该调用的并发过程的方法,所以很多读者在学习之后在实际编码中还是需要帮助理解如何使用。所以在了解了为什么我们需要在asyncio中执行CPU-bound并行任务之后,今天我们将通过一个真实世界的例子来解释如何使用asyncio同时处理IO-bound和CPU-bound任务,并领略asyncio对我们的效率代码。

Reference

[1]

Source: https://towardsdatascience.com/combining-multiprocessing-and-asyncio-in-python-for-performance-boosts-15496ffe96b

本文由 mdnice 多平台发布

http://www.lryc.cn/news/68987.html

相关文章:

  • 只需要两步就能快速接入GPT
  • 使用Git-lfs上传超过100m的大文件到GitHub
  • 【网络】计算机中的网络
  • 什么是语音识别的语音助手?
  • 自己动手写一个加载器
  • C# 性能优化和Unity性能优化
  • 面试题背麻了,花3个月面过华为测开岗,拿个26K不过分吧?
  • 跟着我学 AI丨教育 + AI = 一对一教学
  • 1-动态规划算法理论基础
  • kafka延时队列内部应用简介
  • 【网络】HTTPHTTPS协议
  • 因子图优化
  • JVM 垃圾回收相关算法
  • [Bing Chat] 以某某这个数据结构 编一个故事 要求不能说出某某这个数据结构
  • 【算法】【算法杂谈】一种字符串和数字的对应关系
  • Java并发基础理论
  • ubuntu22.04静态ip设置(桥接模式、only-host+NAT模式)
  • 深度模型中的正则化、梯度裁剪、偏置初始化操作
  • 设计模式之装饰模式
  • 华为OD机试真题 Java 实现【最佳对手】【2023Q1 200分】
  • IOS证书制作教程
  • 【人工智能】蚁群算法(密恐勿入)
  • VONR排查指导分享
  • Daftart.ai:人工智能专辑封面生成器
  • ZigBee案例笔记 - 定时器
  • GE H201TI 全系统自检和自诊断
  • 这个屏幕录制太好用了!
  • 初识redis【redis的安装使用与卸载】
  • 接口测试总结及其用例设计方法整理,希望可以帮到你
  • 基于FPGA的多功能数字钟的设计