当前位置: 首页 > news >正文

Springboot3.0 集成 RocketMQ5

1 引入依赖
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.3.1</version>
</dependency>2配置
rocketmq:name-server: 192.168.150.50:9876producer:group: dist-test # 生产者组pull-consumer: # pull模式消费者group: testtopic: MyTopic3 启动类引入配置
创建主题:
[root@localhost bin]# sh mqadmin updateTopic -n 192.168.150.50:9876 -b 192.168.150.50:10911 -t MyTopic -w 4 -r 4
create topic to 192.168.150.50:10911 success.
TopicConfig [topicName=MyTopic, readQueueNums=4, writeQueueNums=4, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false, attributes={}]-n 192.168.150.50:9876:Name Server地址。
-b 192.168.150.50:10911:Broker地址。
-t MyTopic:主题名称。
-w 4:写队列数量。
-r 4:读队列数量。引入配置类
@SpringBootApplication
@ImportAutoConfiguration({RocketMQAutoConfiguration.class})4 测试,用SendController测试发送,用SendController测试接收// RocketMQReplyListener<T, R> 是一个接口,需要返回值的监听器,两个泛型分别是接收消息的类型和返回值类型,对应的发送者rocketMQTemplate.sendAndReceive
// RocketMQListener<T> 无需返回值 T 为接收消息的类型,对应的发送者rocketMQTemplate.convertAndSendbroker的配置也可以不用配置
namesrvAddr=
autoCreateTopicEnable=true
brokerIP1=192.168.150.50
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/rocketmq")
public class SendController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/send")public String send() {/*** 普通消息:只负责发送无需等待响应* 参数1:topic:tag tag可省略不写* 参数2:Object类型的消息体* 消费监听器:一般配合着 RocketMQListener<T> 使用*/rocketMQTemplate.convertAndSend("MyTopic2", "hello world");//        Message message = new Message("topic", "tag", "key", "message body".getBytes());
//        message.putUserProperty("REPLY_TO_CLIENT", "yourClientID"); // Set the reply property
//        producer.send(message);return "success";}@GetMapping("/send2")public String send2() {/*** 普通消息:等待消费者响应* 参数信息和上面一样* 消费者监听器:一般配合着 RocketMQReplyListener<T, R> 使用*/String res = rocketMQTemplate.sendAndReceive("MyTopic", "hello RocketMQ", String.class);return res;}
}

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.springframework.stereotype.Component;// RocketMQReplyListener<T, R> 是一个接口,需要返回值的监听器,两个泛型分别是接收消息的类型和返回值类型,对应的发送者rocketMQTemplate.sendAndReceive
// RocketMQListener<T> 无需返回值 T 为接收消息的类型,对应的发送者rocketMQTemplate.convertAndSend@Slf4j
@Component
@RocketMQMessageListener(topic = "MyTopic", consumerGroup = "test_consumer")
public class TestRocketMQMessageListener implements RocketMQReplyListener<String, String> {@Overridepublic String onMessage(String s) {log.info("接收到RocketMQ消息[topic={}] ======> {}", "test", s);return SendStatus.SEND_OK.name();}
}

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;// RocketMQReplyListener<T, R> 是一个接口,需要返回值的监听器,两个泛型分别是接收消息的类型和返回值类型,对应的发送者rocketMQTemplate.sendAndReceive
// RocketMQListener<T> 无需返回值 T 为接收消息的类型,对应的发送者rocketMQTemplate.convertAndSend@Slf4j
@Component
@RocketMQMessageListener(topic = "MyTopic2", consumerGroup = "test_consumer2")
public class TestRocketMQMessageListener2 implements RocketMQListener<String> {@Overridepublic void onMessage(String s) {log.info("接收到RocketMQ消息[topic={}] ======> {}", "test", s);}
}
http://www.lryc.cn/news/600137.html

相关文章:

  • 理解Spring中的IoC
  • 数字增加变化到目标数值动画,js实现
  • 2025年-ClickHouse 高性能实时分析数据库(大纲版)
  • cha的操作
  • 最新Amos 29下载及详细安装教程,附免激活中文版Amos安装包
  • Nature Communications:复杂光下多维视觉信息处理,利用时间演变的环境极化敏感神经突触器件
  • 基于Docker的GPU版本飞桨PaddleOCR部署深度指南(国内镜像)2025年7月底测试好用:从理论到实践的完整技术方案
  • JavaScript 中 let 在循环中的作用域机制解析
  • 【深度学习新浪潮】Claude code是什么样的一款产品?
  • swiper.js实现名录上下滚动
  • Python 条件分支与循环详解--python004
  • 【Agent】API Reference Manual(API 参考手册)
  • 【Spring AI详解】开启Java生态的智能应用开发新时代(附不同功能的Spring AI实战项目)
  • 手写A2C(FrozenLake环境)
  • 牛客刷题记录01
  • 【C++】二叉搜索数
  • 流式接口,断点续传解决方案及实现
  • QKV 为什么是三个矩阵?注意力为何要除以 √d?多头注意力到底有啥用?
  • 【PyTorch】图像多分类项目
  • jwt 在net9.0中做身份认证
  • qt框架,使用webEngine如何调试前端
  • 开发笔记 | 优化对话管理器脚本与对话语音的实现
  • 13.使用C连接mysql
  • Jenkins中出现pytest: error: unrecognized arguments: --alluredir=report错误解决办法
  • 栈----1.有效的括号
  • 机器学习 KNN 算法,鸢尾花案例
  • 从Taro的Dialog.open出发,学习远程控制组件之【事件驱动】
  • C++ 多线程同步机制详解:互斥锁、条件变量与原子操作
  • Python Multiprocessing 进程池完全教程:从理论到实战
  • 数据结构(3)单链表