AI编程:python测试MQ消息服务联接和消息接收
业务背景:新项目需要与OA流程集成,即OA流程集成时需要将流程信息推到第三方系统,最早的方案是在OA作JAVA开发调第三方系统的接口,将所需内容推到第三方项目。但上了OA ESB后方案是:OA流程归档时调ESB配置的MQ服务;第三方收到MQ消息后,调OA的流程接口取相应的流程信息并处理。系统管理员配置并提供了相关文档后,第三方反馈联不上MQ。初步调查没有找到原因。问题升级到我这。
为了验证到底是什么原因,让AI帮我开发验证一下能不能联上:
提示词:请提供Python 代码实现的对RabbitMQ 消息通道验证是否可以联接 提供 MQ的用户名,密码、vhost、IP地址
import pika
from pika.exceptions import AMQPConnectionErrordef validate_rabbitmq_connection(host, username, password, vhost):try:# 创建凭证对象credentials = pika.PlainCredentials(username, password)# 配置连接参数parameters = pika.ConnectionParameters(host=host,virtual_host=vhost,credentials=credentials)# 尝试建立连接connection = pika.BlockingConnection(parameters)print("连接成功!")connection.close()except AMQPConnectionError as e:print(f"连接失败: {str(e)}")result = validate_rabbitmq_connection(host='192.168.1.X',username='XXX',password='XXX',vhost='XX'
)print(result )
测试是连接成功,可以接上。主要是没有给第三方开发人员提供vhost信息,系统默认用/,而没有/的权限。所以报错。
为了加快开发继续让AI帮我确证一下能不能收到消息。
提示词:请提供Python 代码实现的RabbitMQ 测试程序
以下是一个基于Python的RabbitMQ测试程序示例,涵盖消息发送、接收、持久化、确认机制等核心功能。代码分为**生产者(发送消息)和消费者(接收消息)**两部分,并附有配置说明和使用方法。
生产者(Producer)
【略】
消费者(Consumer)
import pika
import logginglogging.basicConfig(level=logging.INFO,format="%(asctime)s [Consumer] %(levelname)s: %(message)s"
)def on_message_received(ch, method, properties, body):"""消息接收回调函数"""logging.info(f"Received: {body.decode('utf-8')}")ch.basic_ack(delivery_tag=method.delivery_tag) # 手动确认消息def start_consumer(host="192.168.1.XX", port=5672, name="XXXX", password="XXXX"):credentials = pika.PlainCredentials(name, password)parameters = pika.ConnectionParameters(host=host,port=port,virtual_host="XXXX",credentials=credentials)try:connection = pika.BlockingConnection(parameters)channel = connection.channel()# 声明队列和交换器channel.exchange_declare(exchange="test", exchange_type="direct")channel.queue_declare(queue="test_queue", durable=True)channel.queue_bind(#exchange="workflow.status",exchange="test",queue="XXXX",routing_key="test.*.*")# 启动消费者channel.basic_consume(queue="XXXX",on_message_callback=on_message_received,auto_ack=False # 需手动确认消息)logging.info("Waiting for messages...")channel.start_consuming()except KeyboardInterrupt:logging.info("Stopping consumer...")channel.stop_consuming()finally:connection.close()if __name__ == "__main__":start_consumer()
把其中MQ的用户名,密码、vhost、IP地址、通道名更改后,执行。
已经正确执行。
在RabbitMQ服务器上发送一个消息
python端可以正常收到消息。
经验:经多次测试:以下两处必须要与MQ上配置的通道名一致,否则会报以下错误:
pika.exceptions.ChannelClosedByBroker: (404, “NOT_FOUND - no queue
‘XXX’ in vhost ‘XXX’”)
要求我们的开发人员在提升给第三方集成的,必须先自测测试通过没有问题,再提供第三方开发。