当前位置: 首页 > news >正文

RocketMq集成SpringBoot(待完善)

环境

jdk1.8, springboot2.7.3

Maven依赖

        <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.3</version><relativePath/> <!-- lookup parent from repository --></parent>        <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.2</version><exclusions><exclusion><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.5</version></dependency>

配置文件

rocketmq.name-server=192.168.6.128:9876
#生产通用群组, 也可单独指定
rocketmq.producer.group=springBootGroup
#消费通用群组, 也可单独指定
rocketmq.consumer.group=testGroup
server.port=9000

代码

生产者发送消息

@RestController
@RequestMapping("/producer")
public class ProducerController {@Autowiredprivate ProducerService producerService;// 发送同步消息@PostMapping("/sendSync")public Object sendSync(@RequestBody MessageReq req) {return producerService.sendSyncMessage(req.getTopic(), req.getTag(), req.getMessage());}// 发送异步消息@PostMapping("/sendAsync")public Object sendAsyncMessage(@RequestBody MessageReq req) {producerService.sendAsyncMessage(req.getTopic(), req.getTag(), req.getMessage());return "200";}
}
@Service
public class ProducerService {@Autowiredprivate RocketMQTemplate rocketMQTemplate;/*** 发送同步消息.* @return 发送结果*/public SendResult sendSyncMessage(String topic, String tag, String message) {// param1: topic和tag冒号分隔return rocketMQTemplate.syncSend(topic + ":" + tag, message);}/*** 发送异步消息.*/public void sendAsyncMessage(String topic, String tag, String message) {rocketMQTemplate.convertAndSend(topic + ":" + tag, message);}
}

消费者

@Component
@RocketMQMessageListener(consumerGroup = "SimpleStringConsumerGroup",  // consumerGroup:消费者组名topic = "MQ_sp_test1",                         // topic:订阅的主题selectorExpression = "Tag-kk||Tag-kk2",         // selectorExpression, 1. 根据Tag过滤, 多个用||分割, 也可设置*; 2. 根据SQL92语法过滤
//        selectorExpression = "*",
//        selectorType = SelectorType.SQL92,             // 设置SQL92语法过滤, 不设置默认TAGmessageModel = MessageModel.CLUSTERING,  // messageModel: 控制消息模式。MessageModel.CLUSTERING:负载均衡;MessageModel.BROADCASTING:广播模式consumeMode= ConsumeMode.CONCURRENTLY    // CONCURRENTLY: 无序消费; ORDERLY: 有序消费
)
public class SimpleConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("Received message : "+ message);}
}

测试

同步消息

异步消息 

TAG过滤消息

1. 消费者指定了TAG, 不满足的不会消费, 状态是CONSUMED_BUT_FILTERED

消费端接收消息 

http://www.lryc.cn/news/258860.html

相关文章:

  • 刚学Python有点难怎么办?这是好事啊!
  • LNMP网站架构分布式搭建部署
  • lwIP 细节之六:connected、sent、poll 回调函数是何时调用的
  • C语言搭建项目-学生管理系统(非链表)
  • 美易官方:投资美股证券投资组合的优势及快速上手指南
  • centos日常运维随记
  • 设计模式之观察者模式(主题对象发生变化,通知各个观察者)
  • vue+高德,百度地图
  • 工信部举行发布会 数字化产业推动元宇宙发展取得良好成效
  • 有没有手机电脑同步的工作时间管理软件?
  • docker安装及简单使用(Linux版本)
  • 山西电力市场日前价格预测【2023-12-10】
  • 在OpenCV基于深度学习的超分辨率模型实践
  • beebox靶场A3 中等级别 xss通关教程
  • 前端知识笔记(二)———Django与Ajax
  • C++新经典模板与泛型编程:用成员函数重载实现is_base_of
  • 【vue3】处理数组方法,在数组中获取指定条件所在的数组对象等持续更新笔记~~
  • digit函数
  • Linux中的堡垒机搭建以及使用
  • ubuntu安装微信客户端
  • ajax清空所有表单内容,包括input标签、单选框radio、多选框CheckBox、下拉框select以及文本域内容
  • 通配符用法
  • 如何从eureka-server上进行服务发现,负载均衡远程调用服务
  • Flutter实现Android拖动到垃圾桶删除效果-Draggable和DragTarget的详细讲解
  • Nacos和Eureka冲突问题原因分析
  • 『C++成长记』拷贝构造函数
  • B 站基于 StarRocks 构建大数据元仓
  • 最常用的4种光纤接口结构
  • Axure网页端高交互组件库, 下拉菜单文件上传穿梭框日期城市选择器
  • 基于Java新人入职管理系统