当前位置: 首页 > news >正文

Redis(⑤-线程池隔离)

代码

import redis
import time
from concurrent.futures import ThreadPoolExecutor
import threading# 连接Redis
r = redis.Redis(host='localhost', port=6379, db=0)# 1. 定义两个隔离的线程池(不同业务用不同线程池)
# 业务A:商品查询(分配5个线程)
query_pool = ThreadPoolExecutor(max_workers=5, thread_name_prefix="query_")
# 业务B:下单(分配3个线程)
order_pool = ThreadPoolExecutor(max_workers=3, thread_name_prefix="order_")# 2. 业务函数:查询商品(模拟正常操作)
def query_product(product_id):# 用Redis记录当前线程和任务状态thread_name = threading.current_thread().namer.hset(f"task:status:{product_id}", mapping={"thread": thread_name,"status": "processing","time": time.time()})# 模拟查询操作(访问Redis获取商品信息)product_info = r.get(f"product:{product_id}") or f"Product {product_id} info"time.sleep(0.1)  # 模拟耗时# 更新任务状态为完成r.hset(f"task:status:{product_id}", "status", "completed")return f"Query {product_id}: {product_info}"# 3. 业务函数:下单(可能出现阻塞)
def create_order(order_id):thread_name = threading.current_thread().namer.hset(f"task:status:order_{order_id}", mapping={"thread": thread_name,"status": "processing","time": time.time()})# 模拟下单操作(可能因Redis超时阻塞)try:# 模拟Redis操作(这里故意加随机阻塞,模拟故障)if order_id % 5 == 0:time.sleep(5)  # 每5个订单模拟一次超时r.set(f"order:{order_id}", f"Order {order_id} created")except Exception as e:r.hset(f"task:status:order_{order_id}", "status", f"failed: {str(e)}")return f"Order {order_id} failed: {str(e)}"r.hset(f"task:status:order_{order_id}", "status", "completed")return f"Order {order_id} created successfully"# 4. 模拟高并发请求
def simulate_traffic():# 初始化一些商品数据到Redisfor i in range(10):r.set(f"product:{i}", f"Product {i} - Price ${i*10}")# 提交大量商品查询任务(用query_pool)for i in range(20):query_pool.submit(lambda pid: print(query_product(pid)), i)# 提交一批下单任务(用order_pool,包含可能阻塞的任务)for i in range(10):order_pool.submit(lambda oid: print(create_order(oid)), i)# 等待所有任务完成query_pool.shutdown(wait=True)order_pool.shutdown(wait=True)print("所有任务处理完毕")if __name__ == "__main__":simulate_traffic()# 可以在Redis中查看任务状态:HGETALL task:status:xxx

核心隔离逻辑(对应代码思路)

操作含义对应代码中的线程池隔离思想
用 query:* 和 order:* 区分键不同业务用独立的 Redis 键空间不同业务用独立的线程池资源
业务 B 阻塞时业务 A 正常运行键空间隔离,互不干扰线程池隔离,一个业务阻塞不影响其他业务
HSET xxx thread "xxx"记录任务所属 “线程”线程池记录任务由哪个线程处理

两个业务(“商品库存查询” 和 “商品库存扣减”)操作同一个 Redis 键stock:1001,代表商品 1001 的库存),但使用独立线程池,即使其中一个业务线程拥堵,另一个仍能正常运行。

共享的 Redis 键:stock:1001(值为 100,代表商品 1001 的库存)。
业务 A(查询库存):用线程池 A(5 个线程),每次操作GET stock:1001,耗时 0.1 秒(快速)。
业务 B(扣减库存):用线程池 B(3 个线程),每次操作DECR stock:1001,但每 10 次操作会随机阻塞 5 秒(模拟故障)。

两个业务操作同一个 Redis 键,但因为用了独立的线程池,线程资源互不共享:
业务 B 的线程拥堵(线程池 B 耗尽),只会影响自身的任务处理。
业务 A 的线程池 A 仍有空闲线程,能正常处理查询任务。

极简的线程池隔离示例,用两个独立线程池处理不同任务

核心隔离点:
创建了两个完全独立的线程池 fast_pool(3 线程)和 slow_pool(2 线程),分别处理不同任务。
运行现象:
慢速任务中,任务 0 会阻塞 5 秒,占用 slow_pool 的 1 个线程
此时 slow_pool 只剩 1 个空闲线程,新的慢速任务需要排队
但 fast_pool 的 3 个线程不受影响,5 个快速任务会快速执行完毕
隔离效果:
即使慢速任务线程池因阻塞被占满,快速任务线程池仍能正常工作,两者互不干扰。

import time
from concurrent.futures import ThreadPoolExecutor# 1. 创建两个隔离的线程池
# 线程池A:处理快速任务(3个线程)
fast_pool = ThreadPoolExecutor(max_workers=3, thread_name_prefix="fast_")
# 线程池B:处理可能阻塞的任务(2个线程)
slow_pool = ThreadPoolExecutor(max_workers=2, thread_name_prefix="slow_")# 2. 快速任务(模拟正常业务)
def fast_task(task_id):print(f"快速任务{task_id} 开始(线程:{threading.current_thread().name})")time.sleep(0.5)  # 快速处理print(f"快速任务{task_id} 完成")return task_id# 3. 慢速任务(模拟可能阻塞的业务)
def slow_task(task_id):print(f"慢速任务{task_id} 开始(线程:{threading.current_thread().name})")# 每3个任务模拟一次阻塞if task_id % 3 == 0:time.sleep(5)  # 长时间阻塞else:time.sleep(1)print(f"慢速任务{task_id} 完成")return task_id# 4. 模拟并发请求
if __name__ == "__main__":import threading  # 局部导入,避免命名冲突# 提交5个快速任务到fast_poolfor i in range(5):fast_pool.submit(fast_task, i)# 提交4个慢速任务到slow_pool(其中任务0和3会阻塞5秒)for i in range(4):slow_pool.submit(slow_task, i)# 等待所有任务完成fast_pool.shutdown(wait=True)slow_pool.shutdown(wait=True)print("所有任务处理完毕")

运行后可以看到,快速任务会全部快速完成,而慢速任务会因为阻塞出现排队,但不会影响快速任务的执行节奏 —— 这就是线程池隔离的核心作用。

http://www.lryc.cn/news/614425.html

相关文章:

  • 【从0到1制作一块STM32开发板】6. PCB布线--信号部分
  • React函数组件灵魂搭档:useEffect深度通关指南!
  • 如何实现在多跳UDP传输场景,保证单文件和多文件完整传输的成功率?
  • 三相交流电机旋转磁场产生原理
  • Django模型开发全解析:字段、元数据与继承的实战指南
  • Flutter开发 多孩子布局组件
  • [202403-B]算日期
  • 蓝桥杯----大模板
  • V4L2摄像头采集 + WiFi实时传输实战全流程
  • FreeRTOS入门知识(初识RTOS)(一)
  • Chat GPT5功能
  • 使用 Gulp 替换 XML 文件内容
  • 明厨亮灶场景下误检率↓76%:陌讯多模态融合算法实战解析
  • Ignite节点生命周期钩子机制详解
  • 基于Spring Boot的Minio图片定时清理实践总结
  • 如何使用Databinding实现MVVM架构
  • GPT5新功能介绍以及和其他模型对比
  • InfluxDB漏洞:Metrics 未授权访问漏洞
  • 借助Rclone快速从阿里云OSS迁移到AWS S3
  • 【数据结构】哈希扩展学习
  • 在 Mac 上安装 IntelliJ IDEA
  • 达梦(DM)闪回使用介绍
  • 智能云探索:基于Amazon Bedrock与MCP Server的AWS资源AI运维实践
  • 微信小程序miniprogram-ci 模块实现微信小程序的自动上传功能
  • 微型导轨在半导体制造中有哪些高精密应用场景?
  • 5 种简单方法将 Safari 书签转移到新 iPhone
  • 苹果iPhone 17系列将发售,如何解决部分软件适配问题引发讨论
  • 3 种简单方法备份 iPhone 上的短信 [2025]
  • 若以微服务部署踩坑点
  • Day10 SpringAOP