当前位置: 首页 > news >正文

基于Kafka实现动态监听topic功能

生命无罪,健康万岁,我是laity。

我曾七次鄙视自己的灵魂:

第一次,当它本可进取时,却故作谦卑;

第二次,当它在空虚时,用爱欲来填充;

第三次,在困难和容易之间,它选择了容易;

第四次,它犯了错,却借由别人也会犯错来宽慰自己;

第五次,它自由软弱,却把它认为是生命的坚韧;

第六次,当它鄙夷一张丑恶的嘴脸时,却不知那正是自己面具中的一副;

第七次,它侧身于生活的污泥中,虽不甘心,却又畏首畏尾。

基于Kafka实现动态监听topic功能

业务场景:导条根据各家接口进行数据分发其中包含动态kafka-topic,各家通过监听topic实现获取数据从而实现后续业务。

实现逻辑

pom

yaml 方案1 接收的是String

  kafka:bootstrap-servers: youKafkaIp:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔listener:type: batchconsumer:enable-auto-commit: falsevalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerkey-deserializer: org.apache.kafka.common.serialization.StringDeserializerauto-offset-reset: earliestgroup-id: consumer-sbproducer:value-serializer: org.apache.kafka.common.serialization.StringSerializerkey-serializer: org.apache.kafka.common.serialization.StringSerializer

yaml 方案2 接收的是Byte

  kafka:bootstrap-servers: youKafkaIp:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔listener:type: batchconsumer:enable-auto-commit: falsevalue-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializerkey-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializerauto-offset-reset: earliestgroup-id: consumer-sbproducer:value-serializer: org.apache.kafka.common.serialization.StringSerializerkey-serializer: org.apache.kafka.common.serialization.StringSerializer

收消息CODE

KafkaConfig.java

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor;
import org.springframework.kafka.annotation.KafkaListenerConfigurer;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;import java.util.HashMap;
import java.util.Map;/*** @author laity*/
@EnableKafka
@Configuration
public class KafkaConfig {// 解决 Could not create message listener - MessageHandlerMethodFactory not set  TODO:WWS 不好使/*@Beanpublic KafkaListenerAnnotationBeanPostProcessor kafkaListenerAnnotationBeanPostProcessor() {KafkaListenerAnnotationBeanPostProcessor processor = new KafkaListenerAnnotationBeanPostProcessor();processor.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());return processor;}*/@Beanpublic ConsumerFactory<String, String> consumerFactory() {Map<String, Object> map = new HashMap<>();map.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "youKafkaIp:9092");map.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-laity");map.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());map.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());return new DefaultKafkaConsumerFactory<String, String>(map);}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(5);// new DefaultMessageHandlerMethodFactory()return factory;}// implements KafkaListenerConfigurer + 解决 Could not create message listener - MessageHandlerMethodFactory not set/*@Overridepublic void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {registrar.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());}*/
}

KafkaListenerController.java

package cn.iocoder.yudao.server.controller.admin.szbl;import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.server.controller.admin.szbl.common.config.kafka.MyComponent;
import cn.iocoder.yudao.server.controller.admin.szbl.vo.InitSceneVO;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.security.PermitAll;/*** @author laity*/
@RestController
@RequestMapping("/kafka")
public class KafkaListenerController {private final MyComponent component;public KafkaListenerController(MyComponent component) {this.component = component;}private String topic;// 用于接收导条分发数据接口@PostMapping("/reception")@PermitAllpublic CommonResult<Boolean> putAwayL(@RequestBody InitSceneVO vo) {// …… 业务逻辑// 去执行 监听固定的topiccomponent.startListening(vo.getGzTopicName());return CommonResult.success(true);}
}

DynamicKafkaListenerService.java

import org.springframework.aop.framework.Advised;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.stereotype.Service;import java.lang.reflect.Method;
import java.time.LocalDateTime;
import java.util.Objects;/*** @author laity 动态管理Kafka监听器*/
@Service
public class DynamicKafkaListenerService {private final KafkaListenerEndpointRegistry registry;private final ConcurrentKafkaListenerContainerFactory<String, String> factory;@Autowiredpublic DynamicKafkaListenerService(KafkaListenerEndpointRegistry registry, ConcurrentKafkaListenerContainerFactory<String, String> factory) {this.registry = registry;this.factory = factory;}public void addListener(String topic, String groupId, Object bean, Method method) {if (AopUtils.isAopProxy(bean)) {try {bean = ((Advised) bean).getTargetSource().getTarget();} catch (Exception e) {throw new RuntimeException(e);}}MethodKafkaListenerEndpoint<String, String> endpoint = new MethodKafkaListenerEndpoint<>();assert bean != null;endpoint.setBean(bean);endpoint.setMethod(method);endpoint.setTopics(topic);endpoint.setGroup(groupId);endpoint.setId(method.getName() + "_" + LocalDateTime.now());endpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory()); // 之前怎么点都点不出来这个属性 突然又出来了……无语registry.registerListenerContainer(endpoint, factory, true); // 指定容器工厂}public void removeListener(String beanName) {// 断言Objects.requireNonNull(registry.getListenerContainer(beanName)).stop();registry.unregisterListenerContainer(beanName);}
}

BlueKafkaConsumer.java

import org.springframework.stereotype.Component;/*** @author laity*/
@Component
public class BlueKafkaConsumer {// @KafkaListener(topics = "#{__listener.getTopicName()}", groupId = "consumer-laity")public void listen(Object record) {System.out.println("======================= 接收动态KafkaTopics Received message ========================");System.out.println(record.toString());}}

MyComponent.java

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.lang.reflect.Method;/*** @author laity*/
@Component
public class MyComponent {private final DynamicKafkaListenerService kafkaListenerService;private final BlueKafkaConsumer blueKafkaConsumer;@Autowiredpublic MyComponent(DynamicKafkaListenerService kafkaListenerService, BlueKafkaConsumer blueKafkaConsumer) {this.kafkaListenerService = kafkaListenerService;this.blueKafkaConsumer = blueKafkaConsumer;}public void startListening(String topic) {try {Method blueMethod = BlueKafkaConsumer.class.getMethod("listen", Object.class);kafkaListenerService.addListener(topic, "consumer-laity", blueKafkaConsumer, blueMethod);} catch (NoSuchMethodException e) {throw new RuntimeException(e);}}public void stopListening(String beanName) {kafkaListenerService.removeListener(beanName);}// init@PostConstruct // 这个是服务启动时调用 但我想要的时实时可变的public void init() {}}

世界上最可贵的两个词,一个叫认真,一个叫坚持,认真的人改变自己,坚持的人改变命运,有些事情不是看到了希望才去坚持,而是坚持了才有希望。我是Laity,正在前进的Laity。

http://www.lryc.cn/news/597038.html

相关文章:

  • 变频器实习DAY12
  • (一)从零搭建unity3d机械臂仿真-unity3d导入urdf模型
  • Kafka——Kafka中的位移提交
  • git 修改最近一次 commit 信息
  • 【2025】使用vue构建一个漂亮的天气卡片
  • Dify实战,获取禅道需求,编写测试用例到禅道
  • [AI8051U入门第八步]硬件IIC驱动AHT10温湿度传感器
  • Web 服务器和Web 中间件
  • 主流软件开发方法综述:从敏捷到开源
  • 利用中间件实现任务去重与分发精细化:股吧舆情数据采集与分析实战
  • 如何高效合并音视频文件
  • 设计模式九:构建器模式 (Builder Pattern)
  • echarts【实战】饼状图点击高亮,其他区域变暗
  • flutter使用CupertinoPicker绘制一个传入数据源的省市区选择器
  • [Bug | Cursor] import error: No module named ‘data‘
  • C++刷题 - 7.23
  • 【C++】类和对象(中)构造函数、析构函数
  • nrm指南
  • 二级建造师学习笔记-2025
  • 2025 成都航空装备展供需发布:精准匹配,高效成交
  • 货车手机远程启动功能的详细使用步骤及注意事项
  • C#值类型属性的典型问题
  • 基于.Net Core开源的库存订单管理系统
  • 【NLP舆情分析】基于python微博舆情分析可视化系统(flask+pandas+echarts) 视频教程 - 主页-微博点赞量Top6实现
  • 粗大误差智能滤除:基于格拉布斯准则与机器学习的数据清洗体系​
  • 深入理解 TCP 协议:Linux 网络传输的可靠基石
  • 【Node.js】使用ts-node运行ts文件时报错: TypeError: Unknown file extension “.ts“ for ts 文件
  • Node.js 倒计时图片服务部署与 Nginx 反向代理实战总结
  • The History of Computers
  • 用 Phi-3 Mini 4K Instruct 实现轻量级模型量化与加载