当前位置: 首页 > news >正文

SpringBoot整合RocketMQ(rocketmq-client.jar)

目录

配置pom.xml

配置application.properties

生产者配置 MQProducerConfig.java

消费者配置MQConsumerConfig.java

消费者监听 MQConsumeMsgListenerProcessor.java

发送消息


Springboot集成RocketMQ:通过直接引入rocketmq-client依赖实现基础集成,需手动配置生产者和消费者。

配置pom.xml

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.7.0</version>
</dependency>

配置application.properties

# 是否开启自动配置
rocketmq.producer.isOnOff=on
# 发送同一类消息设置为同一个group,保证唯一默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标识
rocketmq.producer.groupName=GID_abc
# mq的nameserver地址
rocketmq.producer.namesrvAddr=localhost:9876
# 消息最大长度 默认 1024 * 4 (4M)
rocketmq.producer.maxMessageSize = 4096
# 发送消息超时时间,默认 3000
rocketmq.producer.sendMsgTimeOut=3000
# 发送消息失败重试次数,默认2
rocketmq.producer.retryTimesWhenSendFailed=2# 是否开启自动配置
rocketmq.consumer.isOnOff=on
# 发送同一类消息设置为同一个group,保证唯一默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标识
rocketmq.consumer.groupName=GID_abc
# mq的nameserver地址
rocketmq.consumer.namesrvAddr=localhost:9876
# 消费者订阅的主题topic和tags(*标识订阅该主题下所有的tags),格式: topic~tag1||tag2||tags3;
rocketmq.consumer.topics=abcTopic~*
# 消费者线程数据量
rocketmq.consumer.consumeThreadMin=5
rocketmq.consumer.consumeThreadMax=32
# 设置一次消费信心的条数,默认1
rocketmq.consumer.consumeMessageBatchMaxSize=1

生产者配置 MQProducerConfig.java

import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Getter
@Setter
@ToString
@Configuration
@ConfigurationProperties(prefix = "rocketmq.producer")
public class MQProducerConfig {public static final Logger LOGGER = LoggerFactory.getLogger(MQProducerConfig.class);private String groupName;private String namesrvAddr;// 消息最大值private Integer maxMessageSize;// 消息发送超时时间private Integer sendMsgTimeOut;// 发送失败重试次数private Integer retryTimesWhenSendFailed;@Bean@ConditionalOnProperty(prefix = "rocketmq.producer", value = "isOnOff", havingValue = "on")public DefaultMQProducer defaultProducer() throws MQClientException {LOGGER.info("-----defaultProducer 正在创建-----");DefaultMQProducer producer = new DefaultMQProducer(groupName);producer.setNamesrvAddr(namesrvAddr);producer.setVipChannelEnabled(false);producer.setMaxMessageSize(maxMessageSize);producer.setSendMsgTimeout(sendMsgTimeOut);producer.setRetryTimesWhenSendAsyncFailed(retryTimesWhenSendFailed);producer.start();LOGGER.info("-----rocketmq producer server 开启成功-----");return producer;}
}

消费者配置MQConsumerConfig.java

import com.bestone.online.consult.receiverMq.MQConsumeMsgListenerProcessor;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Getter
@Setter
@ToString
@Configuration
@ConfigurationProperties(prefix = "rocketmq.consumer")
public class MQConsumerConfig {public static final Logger LOGGER = LoggerFactory.getLogger(MQConsumerConfig.class);private String groupName;private String namesrvAddr;private String topics;// 消费者线程数据量private Integer consumeThreadMin;private Integer consumeThreadMax;private Integer consumeMessageBatchMaxSize;@Autowiredprivate MQConsumeMsgListenerProcessor consumeMsgListenerProcessor;@Bean@ConditionalOnProperty(prefix = "rocketmq.consumer", value = "isOnOff", havingValue = "on")public DefaultMQPushConsumer defaultConsumer() {LOGGER.info("-----defaultConsumer 正在创建-----");DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);consumer.setNamesrvAddr(namesrvAddr);consumer.setConsumeThreadMin(consumeThreadMin);consumer.setConsumeThreadMax(consumeThreadMax);consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);// 设置监听consumer.registerMessageListener(consumeMsgListenerProcessor);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);try {// 设置该消费者订阅的主题和tag,如果订阅该主题下的所有tag,则使用*,String[] topicArr = topics.split(";");for (String topic : topicArr) {String[] tagArr = topic.split("~");consumer.subscribe(tagArr[0], tagArr[1]);}consumer.start();LOGGER.info("-----consumer 创建成功 groupName={}, topics={}, namesrvAddr={}-----", groupName, topics, namesrvAddr);} catch (MQClientException e) {LOGGER.error("-----consumer 创建失败!-----");}return consumer;}
}

消费者监听 MQConsumeMsgListenerProcessor.java

import com.alibaba.fastjson.JSONObject;
import java.util.Date;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.List;/*** 消费者监听*/
@Component
public class MQConsumeMsgListenerProcessor implements MessageListenerConcurrently {private static final Logger LOGGER = LoggerFactory.getLogger(MQConsumeMsgListenerProcessor.class);/*** 默认msg里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息* 不要抛异常,如果没有return CONSUME_SUCCESS,consumer会重新消费该消息,直到return CONSUME_SUCCESS*/@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList,ConsumeConcurrentlyContext consumeConcurrentlyContext) {if (CollectionUtils.isEmpty(msgList)) {LOGGER.info("MQ接收消息为空,直接返回成功");return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}MessageExt messageExt = msgList.get(0);LOGGER.info("MQ接收到的消息为:" + messageExt.toString());try {String topic = messageExt.getTopic();String tags = messageExt.getTags();String data = new String(messageExt.getBody(), "utf-8");LOGGER.info("MQ消息topic={}, tags={}, 消息内容={}", topic, tags, data);// TODO 处理业务逻辑} catch (Exception e) {LOGGER.error("获取MQ消息内容异常{}", e);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
}

发送消息

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/mqProducer")
public class MQProducerController {public static final Logger LOGGER = LoggerFactory.getLogger(MQProducerController.class);@AutowiredDefaultMQProducer defaultMQProducer;/*** 发送简单的MQ消息*/@GetMapping("/send")public void send(String msg)throws InterruptedException, RemotingException, MQClientException, MQBrokerException {LOGGER.info("发送MQ消息内容:" + msg);Message msg = new Message("abcTopic",   msg.getBytes());// 延时级别 "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"// 设置消息延迟级别为3,也就是延迟10s。msg.setDelayTimeLevel(3);// 定时发送消息,5秒之后发送msg.setDeliveryTimestamp(System.currentTimeMillis() + Duration.ofSeconds(5).toMillis());// 默认3秒超时SendResult sendResult = defaultMQProducer.send(msg);LOGGER.info("消息发送响应:" + sendResult.toString());}
}
http://www.lryc.cn/news/603308.html

相关文章:

  • Python day28
  • 【智能协同云图库】智能协同云图库第八弹:基于阿里云百炼大模型—实现 AI 扩图功能
  • 2025年科研算力革命:8卡RTX 5090服务器如何重塑AI研究边界?
  • 0基礎網站開發技術教學(一) --(前端篇)--
  • 思途SQL学习 0729
  • 【CUDA显存不足的问题】
  • ironSource Ads Bidding 现已正式加入TopOn 聚合平台
  • 博弈论03——混合纳什均衡的收益求法
  • 【Linux入坑(一)—全志T133开发板适配欣瑞达LVDS 7寸(800*480)屏幕】
  • 函数对象 vs 函数指针 vs lambda:该用哪个才高效?
  • python学习DAY26打卡
  • Java高级技术知识点
  • GitLab的安装及使用
  • 路由器路由协议详解:从 RIP 到 OSPF 的技术演进
  • 理解Transformer解码器
  • 【术语扫盲】MCU与MPU
  • 《HCIA-Datacom 认证》希赛三色笔记:Vlan间三层通信过程解析
  • 高级08-Java JVM调优:优化你的Java应用
  • 面向对象系统的单元测试层次
  • 医疗AI新基建:MCP与A2A协议的破局与前瞻
  • MySQL——MVCC
  • Django自带的加密算法
  • 汇总10个高质量免费AI生成论文网站,支持GPT4.0和DeepSeek-R1
  • 云端文档管理新纪元:Paperless-ngx与cpolar打造的无边界文件生态
  • PHP性能优化与高并发处理:从基础到高级实践
  • 深入理解Java Map的entrySet()方法
  • VLA--Gemini Robotics On-Device: 将AI带到本地机器人设备上
  • 在WSL中配置VS Code C++开发环境完整教程
  • LeetCode 1616.分割两个字符串得到回文串
  • 【21】C# 窗体应用WinForm ——图片框PictureBox属性、方法、实例应用