当前位置: 首页 > news >正文

RabbitMQ的Direct Exchange模式实现的消息发布案例

Producer生产者代码

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class RabbitMQProducer {private final static String EXCHANGE_NAME = "direct_message_exchange";private final static String EXCHANGE_TYPE = "direct";public static void main(String[] args) {// 1. 创建连接工厂,设置连接参数ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("localhost");connectionFactory.setPort(5672); // RabbitMQ默认端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");try (Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel()) {// 2. 声明交换机 (direct类型,持久化)channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true);// 3. 声明队列 (持久化,非独占,连接断开时不自动删除)channel.queueDeclare("queue5", true, false, false, null);channel.queueDeclare("queue6", true, false, false, null);channel.queueDeclare("queue7", true, false, false, null);// 4. 绑定队列到交换机,设置路由键channel.queueBind("queue5", EXCHANGE_NAME, "order");channel.queueBind("queue6", EXCHANGE_NAME, "order");channel.queueBind("queue7", EXCHANGE_NAME, "course");// 5. 准备要发送的消息String message = "你好,学相伴:www.kuangstudy.com";// 6. 向交换机发送消息,使用路由键 "course"channel.basicPublish(EXCHANGE_NAME, "course", null, message.getBytes("UTF-8"));System.out.println("消息发送成功!");} catch (Exception ex) {// 捕获异常并打印堆栈信息ex.printStackTrace();System.out.println("消息发送出现异常...");} finally {// 在try-with-resources中,不再需要显式关闭连接和通道// 会自动关闭连接和通道}}
}

功能点:

  1. 声明了一个Direct类型的交换机,并绑定了三个队列(queue5queue6queue7)。其中queue5queue6都绑定到order路由键,而queue7绑定到course路由键。
  2. 发送了一条消息到course路由键绑定的队列中(即queue7)。

Consumer消费者代码

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;public class RabbitMQConsumer {private final static String QUEUE_NAME = "queue7"; // 与生产者的绑定一致private final static String EXCHANGE_NAME = "direct_message_exchange";private final static String EXCHANGE_TYPE = "direct";public static void main(String[] args) {// 1. 创建连接工厂,设置连接参数ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("localhost");connectionFactory.setPort(5672); // RabbitMQ默认端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");try (Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel()) {// 2. 声明交换机和队列,与生产者保持一致channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true);channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 3. 绑定队列到交换机,路由键为"course"channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "course");System.out.println(" [*] 等待接收消息...");// 4. 定义接收消息的回调函数DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] 接收到的消息: '" + message + "'");// 这里可以添加进一步的消息处理逻辑};// 5. 开始消费消息 (自动应答)channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });} catch (Exception ex) {// 捕获异常并打印堆栈信息ex.printStackTrace();System.out.println("消费者运行中出现异常...");}}
}

功能点: 

   1.  与生产者保持一致:消费者的队列名称、交换机名称和路由键与生产者保持一致,即监听queue7队列,并接收路由键为course的消息。

   2. 回调函数处理消息:使用DeliverCallback来定义收到消息后的处理逻辑。在回调函数中,delivery.getBody()获取消息内容,随后可以对消息进行处理、存储或其他业务逻辑操作。

   3 自动应答basicConsume中的true表示自动应答(auto-acknowledge),即消息处理完毕后,RabbitMQ会自动确认消息已成功处理。如果需要手动应答,可以将true替换为false,并在处理完成后调用channel.basicAck()来手动确认消息。

http://www.lryc.cn/news/435900.html

相关文章:

  • 数据结构-二叉树-基础知识
  • wangeditor——cdn引入的形式创建一个简易版编辑器——js技能提升
  • 9.11.
  • 【GeekBand】C++设计模式笔记1_介绍
  • MySQL 数据库:原理、应用与发展
  • 7.2图像旋转
  • 学学vue-2
  • 什么是 Grafana?
  • 【Prompt Engineering:思维树 (ToT)、检索增强生成 (RAG)、自动推理并使用工具 (ART)】
  • 【习题】应用/元服务上架
  • 性能测试的复习3-jmeter的断言、参数化、提取器
  • ORB-SLAM2关键点总结
  • 拱式桥安全结构健康监测解决方案
  • windows和linux安装mysql5.7.31保姆级教程
  • 如何使用 PowerShell 脚本来自动化 Windows 开发流程的教程(包括理论介绍和实践示例)
  • CTFHub技能树-信息泄露-HG泄漏
  • OpenCV结构分析与形状描述符(18)比较两个轮廓相似度的函数matchShapes()的使用
  • CCS811二氧化碳传感器详解(STM32)
  • Navicat 17 新特性 | 聚焦 MongoDB
  • openssl的使用
  • ICETEK-DM6437-AICOM—— DMA直接存储器访问设计
  • 【AcWing】快速排序的Go实现
  • 使用C++11的`std::future`和`std::promise`实现异步网络通信
  • 【C++登堂入室】类与对象(上)
  • 【西电电装实习】5. 无人机模块及作用、上位机的操作
  • 有关WSL和docker的介绍
  • 以太坊入门
  • 秃姐学AI系列之:实战Kaggle比赛:狗的品种识别(ImageNet Dogs)
  • 图神经网络介绍3
  • 浅谈 React Fiber