当前位置: 首页 > news >正文

RocketMQ的简单使用

这里需要创建2.x版本的springboot项目 

导入依赖

    <dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.7.6</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency></dependencies>

定义配置文件

server:port: 3000rocketmq:name-server: xxx.xxx.xxx.xxx:9876  # NameServer 地址producer:group: rocketmq-4x-service_common-message-execute_pg # 全局发送者组定义

生产者定义

这里的生产者有两个,一个是普通的,一个是延时。

import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.yhy.MessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import org.springframework.messaging.support.MessageBuilder;@Component
@Slf4j
public class GeneralMessageDemoProduce {@Autowiredprivate RocketMQTemplate rocketMQTemplate;public SendResult sendMessage(String topic, String tag, String keys, MessageEvent messageSendEvent) {SendResult sendResult;try{StringBuilder destinationBuilder = StrUtil.builder().append(topic);if(StrUtil.isNotBlank(tag)){destinationBuilder.append(":").append(tag);}Message<?> message = MessageBuilder.withPayload(messageSendEvent).setHeader(MessageConst.PROPERTY_KEYS,keys).setHeader(MessageConst.PROPERTY_TAGS, tag).build();// 设置消息的延时级别sendResult=rocketMQTemplate.syncSend(destinationBuilder.toString(),message,2000L);log.info("[普通消息] 消息发送结果:{},消息ID:{},消息Keys:{}", sendResult.getSendStatus(), sendResult.getMsgId(), keys);}catch(Throwable ex){log.error("[普通消息] 消息发送失败,消息体:{}", JSON.toJSONString(messageSendEvent), ex);throw ex;}return sendResult;}
}

延时的

@Component
@Slf4j
public class ScheduleProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;public SendResult sendMessage(String topic, String tag, String keys, MessageEvent messageSendEvent ) {SendResult sendResult;try {StringBuilder destinationBuilder = StrUtil.builder().append(topic);if(StrUtil.isNotBlank(tag)){destinationBuilder.append(":").append(tag);}Message<?> message = MessageBuilder.withPayload(messageSendEvent).setHeader(MessageConst.PROPERTY_KEYS,keys).setHeader(MessageConst.PROPERTY_TAGS, tag).build();// 设置消息的延时级别sendResult=rocketMQTemplate.syncSend(destinationBuilder.toString(),message,2000L,6);log.info("[延时消息] 消息发送结果:{},消息ID:{},消息Keys:{}", sendResult.getSendStatus(), sendResult.getMsgId(), keys);}catch(Throwable ex){log.error("[延时消息] 消息发送失败,消息体:{}", JSON.toJSONString(messageSendEvent), ex);throw ex;}return sendResult;}
}

消费者定义

这里也是两个消费者,普通的和延时的不在同一个主题的内

@Slf4j
@Component
@RocketMQMessageListener(topic = "rocketmq-yhy_topic",selectorExpression = "general",consumerGroup = "rocketmq-demo_general-message_cg"
)
public class GeneralMessageDemoConsume implements RocketMQListener<MessageEvent> {@Overridepublic void onMessage(MessageEvent message) {log.info("接到RocketMQ消息,消息体:{}", JSON.toJSONString(message));}
}
import com.alibaba.fastjson.JSON;
import com.yhy.MessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
@RocketMQMessageListener(topic = "Delay",selectorExpression = "general",consumerGroup = "rocketmq-demo_general-message_cg"
)
public class GeneralMessageDemoConsume_Delay implements RocketMQListener<MessageEvent> {@Overridepublic void onMessage(MessageEvent message) {log.info("接到RocketMQ的延时消息,消息体:{}", JSON.toJSONString(message));}
}

发送消息

这里直接在启动类发送。

@SpringBootApplication
@RestController
public class RocketMQDemoApplication {@Autowiredprivate GeneralMessageDemoProduce generalMessageDemoProduce;@Autowiredprivate ScheduleProducer scheduleProducer;@PostMapping("/test/send/general-message")public String sendGeneralMessage() {String keys= UUID.randomUUID().toString();MessageEvent messageEvent=new MessageEvent("消息具体内容——yhy",keys);SendResult sendResult=generalMessageDemoProduce.sendMessage("rocketmq-yhy_topic","general",keys,messageEvent);SendResult sendResult2=scheduleProducer.sendMessage("Delay","general",keys,messageEvent);System.out.println(sendResult.getSendStatus().name() );System.out.println(sendResult2.getSendStatus().name());return sendResult.getSendStatus().name();}public static void main(String[] args) {SpringApplication.run(RocketMQDemoApplication.class, args);}
}

postman触发

http://www.lryc.cn/news/333346.html

相关文章:

  • 速盾:服务器有cdn 带宽上限建议多少
  • 智慧工地安全+绿色施工方案
  • SQL Server 存储过程:BBS论坛(表结构文档下载及30个存储过程)
  • 03 Python进阶:MySQL - mysql-connector
  • InnoDB 行记录格式(“存储一行行数据的结构“)
  • 【洛谷】P9236 [蓝桥杯 2023 省 A] 异或和之和
  • ThreadLocal加切面实现线程级别的方法缓存
  • 使用 Flume 将 CSV 数据导入 Kafka:实现实时数据流
  • 对代理模式的理解
  • #QT项目实战(天气预报)
  • 数据挖掘|关联分析与Apriori算法详解
  • ChatGPT Excel 大师
  • C 语言中的 end, _end 符号
  • 绿联 安装PDF工具
  • 备战蓝桥杯---数论相关问题
  • 苹果手表Apple Watch录了两个半小时的录音,却只能播放4秒,同步到手机也一样,还能修复好吗?
  • RGB三通道和灰度值的理解
  • ARM、X86、RISC-V三分天下
  • 力控机器人原理及力控制实现
  • 最小生成树
  • 二维动画制作软件 Animate 2024 for mac激活版
  • 相对论中关于光速不变理解的补充
  • 面试(04)————JavaWeb
  • Debian12 使用 nginx 与 php8.2 使用 Nextcloud
  • Java设计模式:代理模式的静态和动态之分(八)
  • 【论文通读】AgentStudio: A Toolkit for Building General Virtual Agents
  • wordvect嵌入和bert嵌入的区别
  • 渗透测试练习题解析 5(CTF web)
  • PCA(Principal Component Analysis,主成分分析)
  • 干货 | 探索CUTTag:从样本到文库,实验步步为营!