rabbitmq中的消息确认
如何保证消息被全部消费
应用场景:我们不想丢失任何任务消息。如果一个工作者(worker)挂掉了,我们希望任务会重新发送给其他的工作者(worker)。
- 为了防止消息丢失,RabbitMQ提供了消息响应(acknowledgments)。消费者会通过一个ack(响应),告诉RabbitMQ已经收到并处理了某条消息,然后RabbitMQ就会释放并删除这条消息。
- 如果消费者(consumer)挂掉了,没有发送响应,RabbitMQ就会认为消息没有被完全处理,然后重新发送给其他消费者(consumer)。这样,即使工作者(workers)偶尔的挂掉,也不会丢失消息。
示例
消费者
import time
import pika
from helloWorld.config import userName, passwordcredentials = pika.credentials.PlainCredentials(userName, password)
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', virtual_host='/', credentials=credentials))
channel = connection.channel()
channel.queue_declare(queue='hello-02', durable=True) # durable=True设置消息持久化def callback(ch, method, properties, body):time.sleep(1) # 模拟耗时任务print(" [x] Received %r" % body)ch.basic_ack(delivery_tag=method.delivery_tag) # 此设置是为了有work掉线时,存活的work完成各人原有任务后,队列中掉线的work未处理的任务会重新分配给存活的workchannel.basic_qos(prefetch_count=1) # 公平调度;告诉RabbitMQ,再同一时刻,不要发送超过1条消息给一个工作者(worker),直到它已经处理了上一条消息并且作出了响应。这样,RabbitMQ就会把消息分发给下一个空闲的工作者(worker)channel.basic_consume('hello-02',callback,# auto_ack=True # auto_ack=True时,消息响应机制将被关闭auto_ack=False)
# 行一个用来等待消息数据并且在需要的时候运行回调函数的无限循环
channel.start_consuming()
关键点
ch.basic_ack(delivery_tag=method.delivery_tag)
手动确认消息被正常消费channel.basic_qos(prefetch_count=1)
启用公平调度,如果不需要新启work消费消息,可以不设置auto_ack=False
关闭消息自动确认,避免消息发送出去后,就被队列移除消息