当前位置: 首页 > article >正文

Python备忘

1. 自定义多线程程序:

import concurrent.futures
import threadingclass CustomThreadPool:def __init__(self, max_workers):self.max_workers = max_workersself.pool = concurrent.futures.ThreadPoolExecutor(max_workers)self.running_num = 0self.semaphore = threading.Semaphore(max_workers)def submit(self, fn, *args, **kwargs):self.semaphore.acquire()  # 获取信号量(如果已满则阻塞)self.running_num += 1future = self.pool.submit(fn, *args, **kwargs)future.add_done_callback(self._callback)return futuredef _callback(self, future):self.running_num -= 1self.semaphore.release()  # 释放信号量,允许新任务提交def shutdown(self):self.pool.shutdown()

以上程序实现:使用这个自定义线程池时,当并发任务数达到上限后,新提交的任务会被阻塞,直到有任务完成。

使用程序:

import time
import random
from custom_threadpool import CustomThreadPool  # 假设上面的类保存在 custom_threadpool.py 中# 模拟下载图片的函数(实际应用中可替换为 requests.get 等真实下载逻辑)
def download_image(url):# 模拟网络请求延迟delay = random.uniform(0.5, 2.0)time.sleep(delay)# 模拟下载结果size = random.randint(100, 1000)print(f"下载完成: {url} (大小: {size}KB, 耗时: {delay:.2f}s)")return {"url": url, "size": size, "status": "success"}# 主程序
def main():# 创建一个最大并发数为3的线程池with CustomThreadPool(max_workers=3) as pool:# 模拟10个图片下载任务image_urls = [f"https://example.com/image_{i}.jpg" for i in range(1, 11)]# 提交所有下载任务futures = []for url in image_urls:future = pool.submit(download_image, url)futures.append(future)# 获取所有任务的结果(按完成顺序输出)for future in concurrent.futures.as_completed(futures):try:result = future.result()print(f"处理结果: {result['url']} ({result['size']}KB)")except Exception as e:print(f"下载失败: {e}")if __name__ == "__main__":start_time = time.time()main()print(f"总耗时: {time.time() - start_time:.2f}秒")

2.* ** 参数:

def example(a, b, *args, **kwargs):print(f"固定参数:a={a}, b={b}")print(f"位置参数:{args}")print(f"关键字参数:{kwargs}")example(1, 2, 3, 4, x=5, y=6)
# 输出:
# 固定参数:a=1, b=2
# 位置参数:(3, 4)
# 关键字参数:{'x': 5, 'y': 6}

 

http://www.lryc.cn/news/2402851.html

相关文章:

  • 如何在 Windows 11 中永久更改默认浏览器:阻止 Edge 占据主导地位
  • 量子比特实现方式
  • 智慧水务发展迅猛:从物联网架构到AIoT系统的跨越式升级
  • 1、cpp实现Python的print函数
  • 【Linux基础知识系列】第十四篇-系统监控与性能优化
  • 云原生思维重塑数字化基座:从理念到实践的深度剖析
  • Animate On Scroll 用于在用户滚动页面时实现元素的动画效果
  • Java高级 | 【实验五】Spring boot+mybatis操作数据库
  • [蓝桥杯]搭积木
  • 在MATLAB中使用自定义的ROS2消息
  • 使用C/C++和OpenCV实现图像拼接
  • 神经网络-Day46
  • Ubuntu中常用的网络命令指南
  • JVM——如何打造一个类加载器?
  • 【MATLAB去噪算法】基于ICEEMDAN联合小波阈值去噪算法
  • c++ Base58编码解码
  • 证券交易柜台系统解析与LinkCounter解决方案开发实践
  • XXTEA,XTEA与TEA
  • 机器人玩转之---嵌入式开发板基础知识到实战选型指南(包含ORIN、RDK X5、Raspberry pi、RK系列等)
  • 腾讯云国际版和国内版账户通用吗?一样吗?为什么?
  • OrCAD X Capture CIS设计小诀窍系列第二季--03.如何在Capture中输出带有目录和元器件信息的PDF
  • 汽车的安全性能测试:试验台铁地板的重要性
  • Lua和JS的垃圾回收机制
  • 实践指南:从零开始搭建RAG驱动的智能问答系统
  • 边缘计算服务器
  • 矩阵的偏导数
  • 第R9周:阿尔茨海默病诊断(优化特征选择版)
  • 电动螺丝刀-多实体拆图建模案例
  • 当丰收季遇上超导磁测量:粮食产业的科技新征程
  • 电子电气架构 --- 什么是功能架构?