当前位置: 首页 > news >正文

Python:消息队列(RabbitMQ)应用开发实践

文章目录

  • 前言
  • 一、名词解释
    • 1、核心概念
    • 2、交换器类型
  • 二、消息模式
    • 1、简单模式
    • 2、工作队列
      • 消息持久性
    • 3、发布订阅
    • 4、路由模式
    • 5、主题模式
      • 通配符规则
    • 6、RPC模式
      • RPC工作流程
  • 三、安装部署
    • 1、拉取镜像
    • 2、启动容器
    • 3、使用`docker-compose`
  • 四、应用测试
    • 1、安装依赖
    • 2、创建连接
      • 2.1、引入依赖
      • 2.2、创建连接参数
      • 2.3、建立连接
    • 3、生产消息
    • 4、消费消息
  • 总结


前言

  官网有云:RabbitMQ是一个可靠且成熟的消息传递和流媒体代理,易于在云环境、本地和本地计算机上部署。支持多种开放标准协议,包括 AMQP 1.0MQTT 5.0。提供了许多选项,定义消息如何从发布者发送到一个或多个使用者。路由、筛选、流式处理、联合身份验证等。通过确认消息传输和跨集群复制消息的能力,确保消息是安全的。

一、名词解释

1、核心概念

  • 生产者(Producer):发送消息到队列的应用程序。
  • 消费者(Consumer):从队列中接收消息的应用程序。
  • 交换器(Exchange):接收来自生产者的消息,并根据绑定规则将消息路由到相应的队列。
  • 队列(Queue):存储消息,等待消费者处理。
  • 绑定(Binding):定义交换器和队列之间的关系。
  • 头交换器(Headers Exchange):使用消息头属性进行路由,而不是路由键。

2、交换器类型

  • 直接交换器(Direct Exchange):根据消息的路由键精确匹配队列。
  • 主题交换器(Topic Exchange):根据模式匹配路由键,将消息路由到一个或多个队列。
  • 扇出交换器(Fanout Exchange):将消息广播到所有绑定的队列,不考虑路由键。

二、消息模式

  RabbitMQ提供6种消息模式:简单模式、工作队列、发布订阅、路由模式、主题模式和RPC模式。

1、简单模式

  最简单的队列。

在这里插入图片描述

  • P:生产者,也就是要发送消息的程序。
channel.queue_declare(queue=queue_name)
channel.basic_publish(exchange='',routing_key=queue_name,body='Hello World!'
)

  参数exchange不指定时,消息会发送的默认的Exchange,此时的routing_key必须是queue_name(队列名称)。

  • C:消费者:消息的接受者,会一直等待消息到来。
channel.basic_consume(queue=queue_name,auto_ack=True,on_message_callback=callback
)

  参数 auto_ack=True 表示自动应答,消费者取出消息后,消息队列将删除此消息。

  • Queue:消息队列。可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

2、工作队列

  在Worker之间分配任务(竞争消费者模式)。两个消费端共同消费同一个队列中的消息,但是一个消息只能被一个消费者获取。

在这里插入图片描述
  通过 BasicQos 方法设置prefetchCount = 1,如:channel.basic_qos(0, 1, False);。使得每个消费者在同一个时间点最多处理1个消息。prefetchCount只有在手动ackauto_ack=False)的情况下才生效,自动ack不生效。

消息持久性

  使用手动ackauto_ack=False)能够确保消息处理失败或者消费者崩溃时任务不会丢失,但是如果RabbitMQ服务器停止、退出或者崩溃时,队列中的消息将丢失。需要做两件事来确保消息不会丢失:

  • 首先,定义队列时指定队列是持久的(durable=True),如:
channel.queue_declare(queue=queue_name, durable=True)

  如果队列已

http://www.lryc.cn/news/589495.html

相关文章:

  • BlueLotus XSS管理后台使用指南
  • 数据结构自学Day7-- 二叉树
  • 自增主键为什么不是连续的?
  • 策略设计模式分析
  • Git Bash 实战操作全解析:从初始化到版本管理的每一步细节
  • Spring Boot 启动原理揭秘:从 main 方法到自动装配
  • c#进阶之数据结构(字符串篇)----String
  • HTTP常见误区
  • 跨平台移动开发技术深度分析:uni-app、React Native与Flutter的迁移成本、性能、场景与前景
  • 【网络安全】大型语言模型(LLMs)及其应用的红队演练指南
  • 物联网系统中MQTT设备数据的保存方法
  • 闲庭信步使用图像验证平台加速FPGA的开发:第十七课——图像高斯滤波的FPGA实现
  • 基于Langchain4j开发AI编程助手
  • 无人机GPS定位系统核心技术解析
  • 图像的读入、显示、保存和图像文件显示
  • 笔试——Day9
  • IMU 能为无人机提供什么数据?
  • 北京-4年功能测试2年空窗-报培训班学测开-第五十一天
  • 快速通关二叉树秘籍(下)
  • Rocky Linux 9 源码包安装php8
  • ChatTongyi × LangChain:开启多模态AI应用创新之门
  • 共射级放大电路的频率响应Multisim电路仿真——硬件工程师笔记
  • 50天50个小项目 (Vue3 + Tailwindcss V4) ✨ | DoubleClickHeart(双击爱心)
  • [设计模式]C++单例模式的几种写法以及通用模板
  • Kubernetes 架构原理与集群环境部署
  • 降本增效!自动化UI测试平台TestComplete并行测试亮点
  • 2025最新国产用例管理工具评测:Gitee Test、禅道、蓝凌测试、TestOps 哪家更懂研发协同?
  • ESLint 除了在packages.json还能在哪里配置?
  • 实测两款效率工具:驾考刷题和证件照处理的免费方案
  • CF37E Trial for Chief 题解