当前位置: 首页 > news >正文

Fluent Python 笔记 第 17 章 使用 future 处理并发

future 指一种对象,表示异步执行的操作。这个概念的作用很大,是 concurrent.futures 模块和 asyncio 包(第 18 章讨论)的基础。

17.1 示例:网络下载的三种风格

17.1.1 依序下载的脚本

17.1.2 使用 concurrent.futures 模块下载

from concurrent import futuresworkers = min(MAX_WORKERS, len(cc_list))
with futures.ThreadPoolExecutor(workers) as executor:res = executor.map(download_one, sorted(cc_list))

17.1.3 future 在哪里

从 Python 3.4 起,标准库中有两个名为 Future 的 类:concurrent.futures.Futureasyncio.Future。这两个类的作用相同:两个 Future 类的实例都表示可能已经完成或者尚未完成的延迟计算。

通常情况下自己不应该创建 future

这两种 future 都有 .done() 方法,这个方法不阻塞,返回值是布尔值,指明 future 链接的 可调用对象是否已经执行。客户端代码通常不会询问 future 是否运行结束,而是会等待通知。

因此,两个 Future 类都有 .add_done_callback() 方法。

还有 .result() 方法。在 future 运行结束后调用的话,这个方法在两个 Future 类 中的作用相同:返回可调用对象的结果,或者重新抛出执行可调用的对象时抛出的异常。 可是,如果 future 没有运行结束,result 方法在两个 Future 类中的行为相差很大。对 concurrency.futures.Future 实例来说,调用 f.result() 方法会阻塞调用方所在的线程,直到有结果可返回。此时,result 方法可以接收可选的 timeout 参数,如果在指定的时间内 future 没有运行完毕,会抛出 TimeoutError 异常。asyncio. Future.result 方法不支持设定超时时间,在那个库中获取 future 的结果最好使用 yield from 结构。不过,对 concurrency.futures.Future 实例不能这么做。

def download_many(cc_list):cc_list = cc_list[:5]with futures.ThreadPoolExecutor(max_workers=3) as executor:to_do = []for cc in sorted(cc_list):future = executor.submit(download_one, cc)to_do.append(future)msg = 'Scheduled for {}: {}'print(msg.format(cc, future))results = []for future in futures.as_completed(to_do):res = future.result()msg = '{} result: {!r}'print(msg.format(future, res))results.append(res)return len(results)

严格来说,我们目前测试的并发脚本都不能并行下载。使用 concurrent.futures 库实现的 那两个示例受 GIL(Global Interpreter Lock,全局解释器锁)的限制,而 flags_asyncio.py 脚本在单个线程中运行。

GIL 几乎对 I/O 密集型处理无害。

17.2 阻塞型I/O和GIL

CPython 解释器本身就不是线程安全的,因此有全局解释器锁(GIL),一次只允许使用一个线程执行 Python 字节码。因此,一个 Python 进程通常不能同时使用多个 CPU 核心。这是 CPython 解释器的局限,与 Python 语言本身无关。标准库中所有执行阻塞型 I/O 操作的函数,在等待操作系统返回结果时都会释放 GIL。

17.3 使用 concurrent.futures 模块启动进程

这个模块实现的是真正的并行计算,因为它使用 ProcessPoolExecutor 类把工作分配给多个 Python 进程处理。因此,如果需要做 CPU 密集型处理,使用这个模块能绕开 GIL,利用所有可用的 CPU 核心。

def download_many(cc_list):workers = min(MAX_WORKERS, len(cc_list))with futures.ThreadPoolExecutor(workers) as executor:

改成:

def download_many(cc_list):with futures.ProcessPoolExecutor() as executor:

ThreadPool Executor.__init__ 方需要 max_workers 参数, 指定线程池中线程的数量。 在ProcessPoolExecutor 类中,那个参数是可选的,而且大多数情况下不使用——默认值是 os.cpu_count() 函数返回的 CPU 数量。

如果使用 Python 处理 CPU 密集型工作,应该试试 PyPy(http://pypy.org)。

17.4 实验 Executor.map 方法

executor = futures.ThreadPoolExecutor(max_workers=3)
results = executor.map(loiter, range(5))  # 返回的是一个生成器
for i, result in enumerate(results):
...

for 循环中的 enumerate 函数会隐式调用 next(results),这个函数又会在(内部)表示第一个任务(loiter(0))的 _f future 上调用 _f.result() 方法。result 方法会阻塞,直到 future 运行结束,因此这个循环每次迭代时都要等待下一个结果做好准备。

Executor.map 函数返回结果的顺序与调用开始的顺序一致。

executor.submitfutures.as_completed 这个组合比 executor.map 更灵活, 因为 submit 方法能处理不同的可调用对象和参数,而 executor.map 只能处理参数不同的同一个可调用对象。此外,传给 futures.as_completed 函数的 future 集合可以来自多个 Executor 实例,例如一些由 ThreadPoolExecutor 实例创建,另一些由 ProcessPoolExecutor 实例创建。

17.5 显示下载进度并处理错误

17.5.1 flags2系列示例处理错误的方式

17.5.2 使用 futures.as_completed 函数

with futures.ThreadPoolExecutor(max_workers=concur_req) as executor:to_do_map = {}for cc in sorted(cc_list):future = executor.submit(download_one, cc, base_url, verbose)to_do_map[future] = ccdone_iter = futures.as_completed(to_do_map)for future in done_iter:try:res = future.result()except requests.exceptions.HTTPError as exc:error_msg = 'HTTP {res.status_code} - {res.reason}'error_msg = error_msg.format(res=exc.response)except requests.exceptions.ConnectionError as exc:error_msg = 'Connection error'else:error_msg = ''status = res.status

futures.as_completed 函数特别有用的惯用法:构建一个字典, 把各个 future 映射到其他数据(future 运行结束后可能有用)上。这里,在 to_do_map 中, 我们把各个 future 映射到对应的国家代码上。这样,尽管 future 生成的结果顺序已经乱了,依然便于使用结果做后续处理。

17.5.3 线程和多进程的替代方案

multiprocessing 模块 还能解决协作进程遇到的最大挑战:在进程之间传递数据。

http://www.lryc.cn/news/7320.html

相关文章:

  • Android进阶之路 - StringUtils、NumberUtils 场景源码
  • 装备制造业数字化转型CRM系统解决方案(信息图)
  • CGAL 二维剖分
  • node.js+vue婚纱影楼摄影婚庆管理系统vscode项目
  • C语言 指针的新理解
  • 【向每个应用View中增加子控件 Objective-C语言】
  • 【FPGA】Verilog:组合电路设计 | 三输入 | 多数表决器
  • 【安全等保】安全等保二级和三级哪个高?哪个费用更高?
  • C++ STL学习记录(v1)
  • 开发中遇到的问题
  • Javascript笔记
  • Elasticsearch(ES)配置及优化
  • 一文看懂Java语言与Java生态圈
  • GitHub 上有什么嵌入式方面的项目?
  • 【C语言进阶】结构体、位段、枚举和联合
  • markdown和latex常用部分参考@注脚@链接跳转@csdn
  • Java 在二叉树中增加一行
  • kubernetes(k8s) 知识总结(第2期)
  • windows-Mysql的主从数据库同步设置
  • Docker逃逸
  • k8s项目部署
  • Modbus通信协议学习笔记
  • ubuntu重启、关机命令
  • Xshell 7 连接云服务器的步骤和出现的错误
  • Python多进程同步——文件锁
  • 实现 element-plus 表格多选时按 shift 进行连选的功能
  • 华为OD机试真题JAVA实现【考古学家】真题+解题思路+代码(20222023)
  • Spring3之基于Aspect实现AOP
  • buctoj-寒假集训进阶训练赛(二十二)
  • 华为OD机试真题JAVA实现【静态扫描最优成本】真题+解题思路+代码(20222023)