当前位置: 首页 > news >正文

springboot 使用RocketMQ客户端生产消费消息DEMO

创建springboot项目省略

项目依赖

注意:当前客户端版本是 5.1.3 ,安装的rocketmq服务的版本要与其对应

	<properties><java.version>11</java.version><rocketmq-client-java-version>5.1.3</rocketmq-client-java-version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId><optional>true</optional></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>${rocketmq-client-java-version}</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>

创建 JmsConfig


public class JmsConfig {//roketmq 服务地址public static String nameServerAddr = "192.168.2.109:9876";//主题public static String TOPIC = "test_topic";
}

创建生产者 Producer

package com.example.springbootrocketmq.jms;import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.stereotype.Component;import javax.annotation.PreDestroy;@Component
public class PayProducer {//生产组private String producerGroup = "test_group";private DefaultMQProducer producer;public PayProducer() {producer = new DefaultMQProducer(producerGroup);//多个NameServer地址 多个地址 ; 号隔开producer.setNamesrvAddr(JmsConfig.nameServerAddr);start();}/*** 开始*/public void start(){try {this.producer.start();} catch (MQClientException e) {e.printStackTrace();}}public DefaultMQProducer getProducer(){return this.producer;}/*** 一般关闭上下文是关闭*/@PreDestroypublic void shutdown(){System.out.println("关闭....");this.producer.shutdown();}
}

创建消费者 Consumer


import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.springframework.stereotype.Component;import java.io.UnsupportedEncodingException;@Component
public class Consumer {private DefaultMQPushConsumer consumer;private String consumerGroup = "test_consumer_group";public PayConsumer() throws Exception{consumer = new DefaultMQPushConsumer(consumerGroup);consumer.setNamesrvAddr(JmsConfig.nameServerAddr);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);consumer.subscribe(JmsConfig.TOPIC, "*");consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {try {Message msg = msgs.get(0);System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));String topic = msg.getTopic();String body = new String(msg.getBody(), "utf-8");String tags = msg.getTags();String keys = msg.getKeys();System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} catch (UnsupportedEncodingException e) {e.printStackTrace();return ConsumeConcurrentlyStatus.RECONSUME_LATER;}});consumer.start();}
}

配置 TestController


import com.example.springbootrocketmq.jms.JmsConfig;
import com.example.springbootrocketmq.jms.Producer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@Slf4j
@RestController
public class TestController{@Autowiredprivate Producer producer;@RequestMapping("/api/v1/test_cb")public Object callback(String text) throws Exception {Message message = new Message(JmsConfig.TOPIC,"taga",("hello rocketmq = "+ text).getBytes());SendResult sendResult = producer.getProducer().send(message);log.info(sendResult.toString());return null;}
}

测试结果

在这里插入图片描述
在这里插入图片描述

http://www.lryc.cn/news/194061.html

相关文章:

  • 第三章 内存管理 四、连续分配管理方式
  • npm install报--4048错误和ERR_SOCKET_TIMEOUT问题解决方法之一
  • 合并两个有序数组
  • 自动泊车系统设计学习笔记
  • 基于Java的家电销售网站管理系统设计与实现(源码+lw+部署文档+讲解等)
  • 设计模式~备忘录模式(memento)-22
  • 【Agora UID 踩坑记录 Java 数据类型】
  • ESP8285 RTOS SDK OTA
  • Hadoop3教程(四):HDFS的读写流程及节点距离计算
  • [0xGameCTF 2023] web题解
  • Qt之submodule编译
  • Python实现带图形界面的计算器
  • $ vue -Vbash: vue: command not found
  • 专业音视频领域中,Pro AV的崛起之路
  • vscode 右侧滚动条标记不提示,问题解决纪录
  • 【Java 进阶篇】JavaScript特殊语法详解
  • PCL点云处理之配准中的匹配对连线可视化显示 Correspondences(二百一十九)
  • Vue el-table全表搜索,模糊匹配-前端静态查询
  • 基于html5开发的Win12网页版,抢先体验
  • Studio One6.5中文版本下载安装步骤
  • Java架构师缓存架构设计解决方案
  • 【玩转Redhat Linux 8.0系列 | 实验—使用Bash shell执行命令】
  • Linux系统编程详解
  • ios设备管理软件iMazing 2.17.11官方中文版新增功能介绍
  • 算法通关村第18关【青铜】| 回溯
  • 【环境搭建】linux docker-compose安装seata1.6.1,使用nacos注册、db模式
  • 20231008-20231013 读书笔记
  • YOLOv8 windows下的离线安装 offline install 指南 -- 以 带有CUDA版本的pytorch 为例
  • 百度车牌识别AI Linux使用方法-armV7交叉编译
  • 数学建模——确定性时间序列分析方法