当前位置: 首页 > news >正文

解决 Python RabbitMQ/Pika 报错:pop from an empty deque

使用 python 的 pika 包连接rabbitmq,代码如下:

import pika
import threading
import timedef on_message(channel, method_frame, header_frame, body):print(f'on_message thread id: {threading.get_ident()}')delivery_tag = method_frame.delivery_tagprint(body, "start")for i in range(10):print(i)time.sleep(20)print(body, "end")channel.basic_ack(delivery_tag)credentials = pika.PlainCredentials('username', 'password')
parameters = pika.ConnectionParameters('test.webapi.username.com', credentials=credentials, heartbeat=5)
connection = pika.BlockingConnection(parameters)channel = connection.channel()
channel.queue_declare(queue="standard", durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume('standard', on_message)print(f'main thread id: {threading.get_ident()}')
try:channel.start_consuming()
except KeyboardInterrupt:channel.stop_consuming()
connection.close()

执行结果:

(Test) [user@user test]$ python mq1.py 
main thread id: 140682766104384
on_message thread id: 140682766104384
b'1' start
0
1
2
3
4
5
6
7
8
9
b'1' end
Traceback (most recent call last):File "mq1.py", line 38, in <module>channel.start_consuming()File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 1865, in start_consumingself._process_data_events(time_limit=None)File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 2026, in _process_data_eventsself.connection.process_data_events(time_limit=time_limit)File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 833, in process_data_eventsself._dispatch_channel_events()File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 567, in _dispatch_channel_eventsimpl_channel._get_cookie()._dispatch_events()File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 1492, in _dispatch_eventsconsumer_info.on_message_callback(self, evt.method,File "mq1.py", line 24, in on_messagechannel.basic_ack(delivery_tag)File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 2112, in basic_ackself._flush_output()File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 1335, in _flush_outputself._connection._flush_output(lambda: self.is_closed, *waiters)File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 523, in _flush_outputraise self._closed_result.value.error
pika.exceptions.StreamLostError: Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')

 从结果来看,异常发生在一次长时间的消费过程(200s)完成后报错,具体为调用channel.basic_ack(delivery_tag)发生报错;推测是此时与MQ Server的连接已经被重置ConnectionResetError(104, 'Connection reset by peer'),此时再主动确认就发生报错。

正确解决方案如下:

"""
@author: Zhigang Jiang
@date: 2022/1/16
@description:
"""
import functools
import pika
import threading
import timedef ack_message(channel, delivery_tag):print(f'ack_message thread id: {threading.get_ident()}')if channel.is_open:channel.basic_ack(delivery_tag)else:# Channel is already closed, so we can't ACK this message;# log and/or do something that makes sense for your app in this case.passdef do_work(channel, delivery_tag, body):print(f'do_work thread id: {threading.get_ident()}')print(body, "start")for i in range(10):print(i)time.sleep(20)print(body, "end")cb = functools.partial(ack_message, channel, delivery_tag)channel.connection.add_callback_threadsafe(cb)def on_message(channel, method_frame, header_frame, body):print(f'on_message thread id: {threading.get_ident()}')delivery_tag = method_frame.delivery_tagt = threading.Thread(target=do_work, args=(channel, delivery_tag, body))t.start()credentials = pika.PlainCredentials('username', 'password')
parameters = pika.ConnectionParameters('test.webapi.username.com', credentials=credentials, heartbeat=5)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue="standard", durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume('standard', on_message)print(f'main thread id: {threading.get_ident()}')
try:channel.start_consuming()
except KeyboardInterrupt:channel.stop_consuming()
connection.close()

思路是pika是线程不安全的,所以在接收消息和ACK响应消息时需要另外线程。

http://www.lryc.cn/news/122403.html

相关文章:

  • 观察者模式实战
  • 035_小驰私房菜_Qualcomm账号注册以及提case流程
  • uniapp input输入框placeholder文本右对齐
  • 分布式监控平台—zabbix
  • 【leetcode】第一章数组-2
  • 程序使用Microsoft.XMLHTTP对象请求https时出错解决
  • Linux安装配置nginx+php搭建
  • springboot的各种配置
  • OSI七层模型及TCP/IP四层模型
  • MDN-Web APIs
  • 2023国赛数学建模C题思路分析
  • 暑假集训笔记
  • 【枚举+推式子】牛客小白月赛 63 E
  • Android多屏幕支持-Android12
  • python环境下载安装教程,python运行环境怎么下载
  • 【0.2】lubancat鲁班猫4远程ubuntu22.04.2 无需任何安装
  • Flutter 状态管理 Provider
  • 【设计模式】观察者模式
  • ORCA优化器浅析——CDXLOperator Base class for operators in a DXL tree
  • go入门实践四-go实现一个简单的tcp-socks5代理服务
  • div 中元素居中的N种常用方法
  • Java获取指定文件夹下目录下所有视频并复制到另一个地方
  • windows server 2016 搭建使用 svn 服务器教程
  • 【Python】如何判断时间序列数据是否为平稳时间序列或非平稳时间序列?
  • Labview控制APx(Audio Precision)进行测试测量(六)
  • 【Linux】网络协议总结
  • 如何轻松注册企业邮箱?快速掌握超简单的注册技巧!
  • 【行为型设计模式】C#设计模式之观察者模式
  • 《Java面向对象程序设计》学习笔记——第 8 章 设计模式
  • Java学习笔记28——字节流1