当前位置: 首页 > news >正文

系列十三、Java操作RocketMQ之带Key的消息

一、概述

        RocketMQ中的消息,默认会有一个messageId当做消息的唯一标识,我们也可以给消息携带一个key,用作唯一标识或者业务标识,包括在控制面板(Dashboard,RocketMQ的一个可视化面板)中也可以使用messageId或者key来进行查询。

二、案例代码

2.1、pom

        同案例五

2.2、RocketMQConstant

        同案例五

2.3、KeyConsumer

package org.star.key.consumer;import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.star.constants.RocketMQConstant;import java.util.List;/*** @Author: 一叶浮萍归大海* @Date: 2023/9/8 10:15* @Description: 带有Key的消息消费者*/
@Slf4j
public class KeyConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("KeyConsumerGroup");consumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);consumer.subscribe("KeyTopic","*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {log.info("消费者[KeyConsumer]正在消费消息,当前线程:{},消息内容:{},标签:{},key:{}",Thread.currentThread().getName(), StrUtil.utf8Str(list.get(0).getBody()),list.get(0).getTags(),list.get(0).getKeys());return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();log.info("KeyConsumer start success");}}

2.4、KeyProducer

package org.star.key.producer;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.star.constants.RocketMQConstant;import java.nio.charset.StandardCharsets;
import java.util.UUID;/*** @Author: 一叶浮萍归大海* @Date: 2023/9/8 10:06* @Description: 带有Key的消息生产者*/
@Slf4j
public class KeyProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("KeyProducerGroup");producer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);producer.start();log.info("KeyProducer start success!");String key = UUID.randomUUID().toString().replaceAll("-","");Message message = new Message("KeyTopic","KeyTag",key,"我是一个带有标记和key的消息".getBytes(StandardCharsets.UTF_8));SendResult sendResult = producer.send(message);log.info("发送结果:{},消息ID:{},队列ID:{}",sendResult.getSendStatus(),sendResult.getMsgId(),sendResult.getMessageQueue().getQueueId());producer.shutdown();}}

2.5、控制台打印

# 生产者端
10:23:01.171 [main] INFO org.star.key.producer.KeyProducer - KeyProducer start success!
10:23:01.657 [main] INFO org.star.key.producer.KeyProducer - 发送结果:SEND_OK,消息ID:0AA86761652418B4AAC22646EA120000,队列ID:2# 消费者端
10:23:33.015 [ConsumeMessageThread_1] INFO org.star.key.consumer.KeyConsumer - 消费者[KeyConsumer]正在消费消息,当前线程:ConsumeMessageThread_1,消息内容:我是一个带有标记和key的消息,标签:KeyTag,key:bbf5efa94d0e473987f5718f3c023c9c

http://www.lryc.cn/news/159543.html

相关文章:

  • C#调用Dapper
  • 2023高教杯数学建模1:ABC题目+初步想法
  • ApachePulsar原理解析与应用实践(学习笔记一)
  • 2023开学礼《乡村振兴战略下传统村落文化旅游设计》许少辉八一新书南京财经大学图书馆
  • qt 信号与槽机制,登录界面跳转
  • uniapp的两个跳转方式
  • 【LeetCode】1654:到家的最少跳跃次数的解题思路 关于力扣无法return的BUG的讨论
  • Calico IP In IP模拟组网
  • 在linux上挂载windows共享目录
  • drone的简单使用
  • day 52 | 84.柱状图中最大的矩形
  • BUUCTF刷题十一道(08)
  • 快速构建基于Paddle Serving部署的Paddle Detection目标检测Docker镜像
  • SOLIDWORKS工程图自动零件序号的极致体验
  • 将ROS bag转成CSV
  • jframe生成柱状图片+图片垂直合并+钉钉机器人推送
  • 如何用J-Link仿真PY32F003系列芯片
  • # Go学习-Day10
  • vue3:5、组合式API-reactive和ref函数
  • Unity Inspector面板上显示Api
  • Redis功能实战篇之附近商户
  • selenium 自动化测试——元素定位
  • 【JMeter】 二次开发插件开发 Dubbo 接口测试插件浅析
  • 手机SSL证书认证失败是什么意思?
  • PXE网络批量装机(centos7)
  • P1104 生日
  • 计算机网络复习大纲
  • Linux:进程(概念)
  • 智能机器人:打造自动化未来的关键技术
  • 大数据(七):Pandas的基础应用详解(四)