当前位置: 首页 > news >正文

RabbitMQ 入门教程

介绍

RabbitMQ 是一个消息中间件,它实现了 AMQP (Advanced Message Queuing Protocol) 协议。本教程将引导你通过几个简单的步骤来学习如何使用 RabbitMQ 发送和接收消息。

环境准备

1. 安装 RabbitMQ

- 在你的系统上安装 RabbitMQ: https://www.rabbitmq.com/download.html

- 启动服务: `sudo rabbitmq-server`

2. 安装客户端库

- Python 示例将使用 `pika` 库: `pip install pika`

第一步: 创建生产者

创建一个简单的生产者,用于发送消息到 RabbitMQ 服务器。

```python

import pika

def main():

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='hello')

message = "Hello World!"

channel.basic_publish(exchange='',

routing_key='hello',

body=message)

print(" [x] Sent %r" % message)

connection.close()

if __name__ == '__main__':

main()

```

第二步: 创建消费者

创建一个简单的消费者,用于接收来自 RabbitMQ 服务器的消息。

```python

import pika

def callback(ch, method, properties, body):

print(" [x] Received %r" % body)

def main():

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_consume(queue='hello',

on_message_callback=callback,

auto_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')

channel.start_consuming()

if __name__ == '__main__':

main()

```

第三步: 使用持久化消息

确保消息在 RabbitMQ 重启后仍然存在。

生产者代码修改

```python

import pika

def main():

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='hello', durable=True)

message = "Hello World!"

channel.basic_publish(exchange='',

routing_key='hello',

body=message,

properties=pika.BasicProperties(

delivery_mode=2, # make message persistent

))

print(" [x] Sent %r" % message)

connection.close()

if __name__ == '__main__':

main()

```

消费者代码修改

```python

import pika

def callback(ch, method, properties, body):

print(" [x] Received %r" % body)

def main():

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='hello', durable=True)

channel.basic_qos(prefetch_count=1)

channel.basic_consume(queue='hello',

on_message_callback=callback,

auto_ack=False)

print(' [*] Waiting for messages. To exit press CTRL+C')

channel.start_consuming()

if __name__ == '__main__':

main()

```

第四步: 使用工作队列

实现一个简单的工作队列,可以分发任务给多个工作者。

生产者

```python

import pika

import sys

import random

def main():

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"

message += f" {random.randint(1, 10)}"

channel.basic_publish(

exchange='',

routing_key='task_queue',

body=message,

properties=pika.BasicProperties(delivery_mode=2)) # make message persistent

print(" [x] Sent %r" % message)

connection.close()

if __name__ == '__main__':

main()

```

工作者

```python

import pika

import time

def callback(ch, method, properties, body):

print(" [x] Received %r" % body)

time.sleep(body.count(b'.'))

print(" [x] Done")

ch.basic_ack(delivery_tag=method.delivery_tag)

def main():

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

print(' [*] Waiting for messages. To exit press CTRL+C')

channel.basic_qos(prefetch_count=1)

channel.basic_consume(queue='task_queue',

on_message_callback=callback)

channel.start_consuming()

if __name__ == '__main__':

main()

```

http://www.lryc.cn/news/431224.html

相关文章:

  • docker进阶 compose等
  • [详细建模已更新]2024数学建模国赛高教社杯A题:“板凳龙” 闹元宵 思路代码文章助攻手把手保姆级
  • 网络编程(TCP+网络模型)
  • Docker Image 命令
  • 如何在IntelliJ IDEA中将Tab设置为4个空格
  • ASP.NET Core 入门教学十五 异步编程
  • pycharm 2024.1下载、安装
  • 实变函数精解【18】
  • 【深入解析】AI工作流中的HTTP组件:客户端与服务端执行的区别
  • 用亚马逊云科技Graviton高性能/低耗能处理器构建AI向量数据库(上篇)
  • 调用火山云的语音生成TTS和语音识别STT
  • 中间件解析漏洞
  • 如何在Mac电脑上本地部署Stable Diffusion:详细教程(webUI)
  • FPGA随记——移位寄存器
  • Java | Leetcode Java题解之第390题消除游戏
  • 新型PyPI攻击技术可能导致超2.2万软件包被劫持
  • spring cloud gateway 之删除请求头
  • Flutter自动打包ios ipa并且上传
  • 深入理解synchronized的原理是什么
  • Electron32-Vue3OS桌面管理os模板|vite5+electron32+arco后台os系统
  • c++ 定义函数
  • 【深度学习 计算机视觉】计算机视觉工程师所需的和有帮助的基本技能
  • 【CSS】如何写渐变色文字并且有打光效果
  • Android 14(API 级别 34)中,DexClassLoader 不再支持可写 dex/jar 文件
  • Linux -动静态库
  • 原点安全荣获“AutoSec Awards 安全之星”优秀汽车数据安全合规方案奖
  • 2024前端面试题分享
  • 数学基础 -- 线性代数之正交矩阵
  • PostgreSQL 17即将发布,新功能Top 3
  • 心觉:别再做单线程的打工人!换个思路突破