当前位置: 首页 > news >正文

RocketMQ集成Springboot --Chapter1

RocketMQ集成Springboot 三种消息发送方式

生产者

引入依赖

<!--⽗⼯程--><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.2.RELEASE</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.4</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-autoconfigure</artifactId></dependency></dependencies>
/***生产者的代码书写demo**/
@SpringBootTest
public class RockerMQTest {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Testpublic void sendMsg(){Message msg = MessageBuilder.withPayload("发送同步消息1").build();rocketMQTemplate.send("helloTopicBoot",msg);}/*** 异步发送消息,成功或者失败之后进行回调*/@Testpublic void sendASYNCMsg() throws InterruptedException {System.out.println("发送前");Message msg = MessageBuilder.withPayload("boot发送异步消息").build();rocketMQTemplate.asyncSend("helloTopicBoot", msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("消息发送状态:"+sendResult.getSendStatus());}@Overridepublic void onException(Throwable throwable) {System.out.println("消息发送失败");}});System.out.println("发送完毕");//睡五秒,不睡的话整个方法就结束了不能进行回调了TimeUnit.SECONDS.sleep(5);}/*** 一次性消息无论消息的结果是什么,通常用于日志等丢失一小部分数据无关紧要的情况下使用*/@Testpublic void sendOnewayMsg(){Message msg = MessageBuilder.withPayload("boot发送一次性消息").build();rocketMQTemplate.sendOneWay("helloTopicBoot",msg);}
}
//application.yml配置文件
rocketmq:name-server: 10.0.0.130:9876producer:group: my-group
消费者

引入依赖

    <!--⽗⼯程--><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.2.RELEASE</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.4</version></dependency></dependencies>

实现一个监听器对象,重写其中的消费消息的方法。使用注解@RocketMQMessageListener(consumerGroup = “htpConsumerGroup”,topic = “helloTopicBoot”)
consumerGroup组必须是唯一的,helloTopicBoot表示要监听的主题

@Component
@RocketMQMessageListener(consumerGroup = "htpConsumerGroup",topic = "helloTopicBoot")
public class HelloTopicListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {System.out.println("收到的消息:"+new String(messageExt.getBody(), Charset.defaultCharset()));}
}

最后生产者启动测试类发送消息,消费者运行主程序一直运行即可.

http://www.lryc.cn/news/95522.html

相关文章:

  • 【Unity3D日常开发】Unity3D中比较string字符串的常用方法
  • vue3+ts+element-plus 之使用node.js对接mysql进行表格数据展示
  • 华为eNSP:isis配置跨区域路由
  • IUPAC和SMILES的相互转换
  • 逻辑回归概述
  • React 框架下自己写一个braft编辑器,然后将编辑器内容展示在网页端
  • 基于DNN深度学习网络的OFDM+QPSK信号检测算法matlab仿真
  • 学生管理系统-05封装选项卡
  • 关于一些C++、Qt、Python方面的术语
  • k8s中强制删除pv
  • 60寸透明屏的透明度怎么样?
  • Python:使用openpyxl读取Excel文件转为json数据
  • 在Microsoft Excel中如何快速合并表格
  • 【RS】基于规则的面向对象分类
  • SWF格式视频怎么转换成AVI格式?简单的转换方法分享
  • Hive数据仓库
  • 公网访问的Linux CentOS本地Web站点搭建指南
  • ChatGPT:人机交互新境界,AI智能引领未来
  • 微信小程序值相同的数据,一个数据setDate修改后,另一个值相同的数据也会修改
  • Spring5学习笔记 — IOC
  • DevOps自动化平台开发之 Shell脚本执行的封装
  • STM32CubeIDE(I2C)
  • http 请求报文响应报文的格式以及Token cookie session 区别
  • 智能汽车的主动悬架工作原理详述
  • vue2和vue3的一些技术点复习
  • 安装nvm 切换node版本
  • 【html中的BFC是什么】
  • 苹果账号被禁用怎么办
  • 跨境出海企业,如何防范恶意退货欺诈
  • 数据出境要依法“过安检”!什么是数据出境?