当前位置: 首页 > news >正文

kafka动态监听主题

简单版本

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.stereotype.Service;@Service
public class DynamicKafkaListenerService {@Autowiredprivate ConsumerFactory<String, String> consumerFactory;public void registerListener(String topic) {ContainerProperties containerProperties = new ContainerProperties(topic);containerProperties.setMessageListener((MessageListener<String, String>) record -> {System.out.println(Thread.currentThread().getName() + " 主题 " + topic + " 收到消息: " + record.value());});ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties);container.setBeanName(topic + "-listener");container.start();}
}

手动ack版本

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.stereotype.Service;@Service
public class DynamicKafkaListenerService {@Autowiredprivate ConsumerFactory<String, String> consumerFactory;public void registerListener(String topic) {ContainerProperties containerProperties = new ContainerProperties(topic);// 设置消息监听器为 AcknowledgingMessageListenercontainerProperties.setMessageListener((AcknowledgingMessageListener<String, String>) (record, ack) -> {try {System.out.println(Thread.currentThread().getName() + " 主题 " + topic + " 收到消息: " + record.value());// 模拟消息处理逻辑// 处理完成后手动确认消息if (ack != null) {ack.acknowledge();}} catch (Exception e) {// 处理异常情况,例如记录日志或重试等System.err.println("消息处理失败: " + e.getMessage());}});// 设置手动确认模式containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties);container.setBeanName(topic + "-listener");container.start();}
}

批量处理版本 

    @Autowiredprivate ConsumerFactory<String, String> consumerFactory;public void registerListener(String topic) {ContainerProperties containerProperties = new ContainerProperties(topic);// 设置手动确认模式containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);// 设置批量消息监听器containerProperties.setMessageListener((BatchAcknowledgingMessageListener<String, String>) (records, ack) -> {try {for (ConsumerRecord<String, String> record : records) {System.out.println(Thread.currentThread().getName() + " 主题 " + topic + " 收到消息: " + record.value());}// 模拟消息处理逻辑// 处理完成后手动批量确认消息ack.acknowledge();} catch (Exception e) {// 处理异常情况,例如记录日志或重试等System.err.println("消息处理失败: " + e.getMessage());}});ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties);container.setBeanName(topic + "-listener");container.start();}

可关闭版本

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.BatchAcknowledgingMessageListener;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.stereotype.Service;import java.util.HashMap;
import java.util.Map;@Service
public class DynamicKafkaListenerService {@Autowiredprivate ConsumerFactory<String, String> consumerFactory;// 用于保存每个主题对应的监听器容器private final Map<String, ConcurrentMessageListenerContainer<String, String>> containerMap = new HashMap<>();/*** 开启一个监听*/public void registerListener(String topic) {ContainerProperties containerProperties = new ContainerProperties(topic);// 设置手动确认模式containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);// 设置批量消息监听器containerProperties.setMessageListener((BatchAcknowledgingMessageListener<String, String>) (records, ack) -> {try {for (ConsumerRecord<String, String> record : records) {System.out.println(Thread.currentThread().getName() + " 主题 " + topic + " 收到消息: " + record.value());}// 模拟消息处理逻辑// 处理完成后手动批量确认消息ack.acknowledge();} catch (Exception e) {// 处理异常情况,例如记录日志或重试等System.err.println("消息处理失败: " + e.getMessage());}});ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties);container.setBeanName(topic + "-listener");container.start();// 将监听器容器保存到 map 中containerMap.put(topic, container);}/*** 关闭一个监听*/public void stopListener(String topic) {ConcurrentMessageListenerContainer<String, String> container = containerMap.get(topic);if (container != null && container.isRunning()) {container.stop();// 从 map 中移除已停止的监听器容器containerMap.remove(topic);}}
}

调用添加监听

    /*** 配置详情*/@GetMapping("/getModelZdyConfInfo")public String getModelZdyConfInfo(String topic) {dynamicKafkaListenerService.registerListener(topic);return "添加" + topic + "监听成功";}

http://www.lryc.cn/news/535559.html

相关文章:

  • Python虚拟环境管理工具 pyenv
  • 网络安全产品架构图 网络安全相关产品
  • C++ 实践扩展(Qt Creator 联动 Visual Studio 2022)
  • 如何实现Deepseek的本地部署并集成本地知识库?
  • vue学习笔记8
  • 【自学笔记】Vue基础知识点总览-持续更新
  • ETL的使用(sqoop):数据导入,导出
  • 【核心特性】从鸭子类型到Go的io.Writer设计哲学
  • 多模态模型详解
  • Go 语言里中的堆与栈
  • 八、OSG学习笔记-
  • 本地部署【LLM-deepseek】大模型 ollama+deepseek/conda(python)+openwebui/docker+openwebui
  • 网络分析工具—WireShark的安装及使用
  • MobaXterm的图形化界面支持:原理与分辨率问题解决
  • Java JVM(Java Virtual Machine)解析
  • pytest测试专题 - 1.2 如何获得美观的测试报告
  • 现阶段股指期货交易保证金和费用多少?股指期货一手多少钱?
  • 使用mermaid画流程图
  • 大模型笔记:pytorch实现MOE
  • HAL库USART中断接收的相关问题
  • @Transational事务注解底层原理以及什么场景事务会失效
  • Linux扩容磁盘
  • 全面解析鸿蒙(HarmonyOS)开发:从入门到实战,构建万物互联新时代
  • Uniapp 原生组件层级过高问题及解决方案
  • Android adb测试常用命令大全
  • linux的基础入门2
  • 19.4.8 数据库综合运用
  • JAVA中的抽象学习
  • 在 Go 中实现事件溯源:构建高效且可扩展的系统
  • 加解密 | AES加、解密学习