当前位置: 首页 > news >正文

RocketMQ延时消息

RocketMQ消息发送基本示例(推送消费者)-CSDN博客

RocketMQ消费者主动拉取消息示例-CSDN博客

RocketMQ顺序消息-CSDN博客

RocketMQ广播消息-CSDN博客

延时消息:

延时消息实现的效果就是产者调用 producer.send 方法后,消息会立即发送到 Broker,并被存储在指定的队列中。RocketMQ 使用内部的延迟队列机制来实现延时消息。消息在到达 Broker 后,会根据设定的延迟级别放入相应的延迟队列。每个延迟级别对应一个特定的延迟时间(如 1 分钟、5 分钟等)。消息在延迟队列中等待,直到延迟时间过去。到达指定时间后,RocketMQ 会将消息移动到实际的消费队列中,这时消息才会对消费者可见。

预定日常定时发送:messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h                    18个级别

可以修改broker.conf文件     messageDelayLevel=3s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

message.setDelayTimeLevel(3) 就是选定第3个等级

5.0版本以上支持 指定时间定时发送

message.setDelayTimeMs(10L) 指定时间定时发送.默认支持最大延迟时间3天.

在broker.conf种可以修改 timerMaxDelaySec=2592000   默认最大3天  72 小时

package com.example.rocketmqdemo.scheldule;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.time.LocalTime;/*** 预定日程生产者* @author hrui* @date 2024/8/1 11:48*/
public class schelduleProducer {public static void main(String[] args) {//创建一个DefaultMQProducer实例,指定生产者组名为"group1"DefaultMQProducer producer = new DefaultMQProducer("schelduleProducer");//生产者组和消费者组是不同概念  不需要相同//设置NameServer地址,RocketMQ客户端通过NameServer获取Broker的路由信息producer.setNamesrvAddr("xxx.xxx.xxx:9876");try {//启动生产者实例producer.start();//发送10条消息for (int i = 0; i < 2; i++) {//创建消息实例,指定主题为"Topic1",标签为"Tag1",消息内容为"Hello World"加上编号Message message = new Message("scheldule", "Tag1", ("schelduleProducer" + i).getBytes(StandardCharsets.UTF_8));//在发送之前设置定时发送等级//messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h//从1开始18个等级//message.setDelayTimeLevel(5);//交给Broker 1分钟后延迟发送给消费者//自定义时间发送  最大72小时message.setDelayTimeMs(30000L);//30秒producer.send(message);System.out.println("消息定时发送成功,已交给Broker:"+ LocalTime.now());}} catch (Exception e) {//捕获并打印异常信息e.printStackTrace();} finally {//关闭生产者实例,释放资源producer.shutdown();}}
}

package com.example.rocketmqdemo.scheldule;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;import java.time.LocalDate;
import java.time.LocalTime;
import java.util.List;/*** @author hrui* @date 2024/8/1 11:55*/
public class ScheduleConsumer {public static void main(String[] args) {//创建一个DefaultMQPushConsumer实例,指定消费者组名为"group1"//采用长轮询机制,模拟推送效果,但本质上是主动拉取。适合低延迟、高实时性的场景。DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");//设置NameServer地址,RocketMQ客户端通过NameServer获取Broker的路由信息consumer.setNamesrvAddr("xxx.xxx.xxx:9876");try {//订阅主题"Topic1",过滤标签为"*",表示接收所有消息consumer.subscribe("scheldule", "*");//设置消息监听器,处理接收到的消息//可以传入两种类型的监听器://1. MessageListenerOrderly(顺序消费):保证消息按顺序处理//2. MessageListenerConcurrently(并发消费):消息并发处理,不保证顺序consumer.setMessageListener(new MessageListenerConcurrently() {//consumeMessage方法用于处理接收到的消息列表@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {System.out.println(Thread.currentThread().getName());for (int i=0;i<list.size();i++){System.out.println(i+"_消息消费成功_"+new String(list.get(i).getBody()));broker是将两条消息分别发送的System.out.println(LocalTime.now());}//返回消费状态,CONSUME_SUCCESS表示消息消费成功return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者实例,开始接收消息consumer.start();} catch (Exception e) {//捕获并打印异常信息e.printStackTrace();}}
}

http://www.lryc.cn/news/411600.html

相关文章:

  • 【C++/STL】:哈希的应用 -- 位图布隆过滤器
  • 非线性面板数据实证模型及 Stata 具体操作步骤
  • 视角 | 麻省理工学院提出出温度计校准法,专治AI大模型过度自信
  • 昇思25天学习打卡营第XX天|CycleGAN图像风格迁移互换
  • 嵌入式Linux学习: interrupt实验
  • GPT-4o mini 来袭:开发者如何驾驭新一代AI模型?
  • 校园点餐系统
  • 进口不锈钢309S螺栓的应用优势
  • C# 设计模式之工厂方法模式
  • Webpack 从入门到精通
  • 基于VScode和C++ 实现Protobuf数据格式的通信
  • linux环境openssl升级
  • 150Kg载重遥控履带式无人车技术详解
  • STM32的外部中断详解
  • 关于python问题 ,生成的excel文件内无爬取的数据存在,请问应如何解决?
  • 详细介绍Avalonia中的文件操作StorageProvider服务
  • 「7.31更新日志」JVS·智能BI、逻辑、规则引擎功能更新说明
  • 编程语言 | C | 代码整理 | 4月
  • 模板可变参数
  • 是你!是你!我们的黄金写手!
  • QT 获取用于获取特定屏幕坐标处的最上层小部件(父与子关系的类)
  • 【应急响应】Linux权限维持 -隐藏权限
  • 还有哪些AI应用案例目前备受关注
  • 将控制台内容输出到文本文件
  • 380. O(1) 时间插入、删除和获取随机元素【 力扣(LeetCode) 】
  • 【每日刷题】Day91
  • 数据库索引的创建和使用
  • 光流传感器 - 从零开始认识各种传感器【第二十二期】
  • 爬虫:jsonpath模块及腾讯招聘数据获取
  • 透明屏幕的显示原理与特点