当前位置: 首页 > news >正文

【开发篇】二十、SpringBoot整合RocketMQ

文章目录

  • 1、整合
  • 2、消息的生产
  • 3、消费
  • 4、发送异步消息
  • 5、补充:安装RocketMQ

在这里插入图片描述

1、整合

首先导入起步依赖,RocketMQ的starter不是Spring维护的,这一点从starter的命名可以看出来(不是spring-boot-starter-xxx,而是xxx-spring-boot-starter,和MyBatisPlus、Druid一样),因此version值得自己加:

<dependency>   <groupId>org.apache.rocketmq</groupId>    <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.1</version>
</dependency>

添加相关配置:

rocketmq:  name-server: localhost:9876  producer:    group: group_rocketmq  # 设置一个自定义的生产者默认组名,省掉这个启动会报错

在需要的地方注入RocketMQTemplate操作对象:

@Autowired    
private RocketMQTemplate rocketMQTemplate;

2、消息的生产

发送消息继续convertAndSend方法,接着上篇在Service层来演示:

@Service
@Slf4j
public class MessageServiceRocketmqImpl implements MessageService {    @Autowired    private RocketMQTemplate rocketMQTemplate;    @Override    public void sendMessage(String id) {        rocketMQTemplate.convertAndSend("order_sm_id",id);      log.info("使用Rabbitmq将待发送短信的订单纳入处理队列,id:"+id);    }
}

convertAndSend方法依旧重载,可以直接传一个Object,也可以先传一个destination参数,即发到哪儿,再传要发的message

3、消费

这里不演示手动receive方法拿消息,直接用监听器自动拿来消费:实现RocketMQListener接口,泛型为Message类型,重写onMessage方法,加@RocketMQMessageListener注解,两个属性为主题名称和消费者组

@Component
@Slf4j
@RocketMQMessageListener(topic="order_sm_id",consumerGroup = "group_rocketmq")
public class RocketmqMessageListener implements RocketMQListener<String> {  @Override    public void onMessage(String id) {        log.info("已完成短信发送业务,id:"+id);    }
}

4、发送异步消息

@Service
@Slf4j
public class MessageServiceRocketmqImpl implements MessageService {   @Autowired    private RocketMQTemplate rocketMQTemplate;    @Override   public void sendMessage(String id) {        //回调逻辑SendCallback callback = new SendCallback() {            @Override            public void onSuccess(SendResult sendResult) {                //消息发送成功后你要做的业务//...log.info("消息发送成功");            }     @Override            public void onException(Throwable throwable) {               log.info("消息发送失败!!!!!!!!!!!");            }        };     //异步发送rocketMQTemplate.asyncSend("order_sm_id",id,callback);  log.info("使用Rabbitmq将待发送短信的订单纳入处理队列,id:"+id);      }
}

asyncSend异步发消息,有个参数是callback回调方法,类型是一个接口,创建这个对象的时候重写onSuccess和OnException方法,即消息发送成功以后的逻辑和消息发送失败以后的逻辑(异步的体现,不用等,来个回调)。

5、补充:安装RocketMQ

建议以Docker方式启动,下面备份下在Windows的安装(安装为一个系统服务):

  • 下载
下载地址:https://rocketmq.apache.org/
  • 安装:解压缩即可
默认服务端口:9876
  • 环境变量配置
ROCKETMQ_HOME
PATH
NAMESRV_ADDR (建议): 127.0.0.1:9876
  • 启动命名服务器:
mqnamesrv
  • 启动Broker
mqbroker
  • 服务器功能测试:生产数据
tools org.apache.rocketmq.example.quickstart.Producer
  • 服务器功能测试:消费数据
tools org.apache.rocketmq.example.quickstart.Consumer
http://www.lryc.cn/news/184897.html

相关文章:

  • OpenCV实现求解单目相机位姿
  • 深入解析PostgreSQL:命令和语法详解及使用指南
  • Elasticsearch数据搜索原理
  • vue模版语法-{{}}/v-text/v-html/v-once
  • 前端埋点上传
  • 第11章 Redis(一)
  • freertos信号量之二值信号量
  • notepad++ 如何去除换行
  • PPT NO.2 ​插入透明校徽
  • Linux系统部署PostgreSQL 单机数据库
  • 好用的办公摸鱼神器
  • 手写Java序列化工具
  • mysql面试题26:MySQL中什么是MVCC,它的底层原理是什么
  • SQL进阶 - SQL的编程规范
  • [NISACTF 2022]babyserialize - 反序列化+waf绕过【*】
  • docker部署Vaultwarden密码共享管理系统
  • 低代码开发技术选型
  • 在vue2中,v-model和.sync的区别
  • nginx 配置
  • 【计算机视觉|人脸建模】学习从图像中回归3D面部形状和表情而无需3D监督
  • Linux系统之部署h5ai目录列表程序
  • Java-Exception
  • C++并发与多线程(2) | 线程运行开始和结束的基本方式
  • vue3前端开发-flex布局篇
  • 网络是什么?(网络零基础入门篇)
  • 【JavaEE】线程安全的集合类
  • 【C++算法】is_partitioned、partition_copy和partition_point
  • MyBatis(JavaEE进阶系列4)
  • 『力扣每日一题15』:买卖股票的最佳时机
  • Java中栈实现怎么选?Stack、Deque、ArrayDeque、LinkedList(含常用Api积累)