当前位置: 首页 > news >正文

springboot 开启和关闭kafka消费

关闭kafka自动消费

配置自定义容器工厂

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.stereotype.Component;@Component
@Configuration
public class kafkaConfig {@Autowiredprivate ConsumerFactory<String, String> consumerFactory;@Bean("pingKafkaFactory")public ConcurrentKafkaListenerContainerFactory<String, String> delayContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> container = new ConcurrentKafkaListenerContainerFactory<String, String>();container.setConsumerFactory(consumerFactory);//禁止自动启动container.setAutoStartup(false);return container;}
}

在消费监听器上使用工厂,并设置id

@KafkaListener(topics = "#{pingProperties.getTopic().split(',')}",id = "pingConsumer",containerFactory = "pingKafkaFactory")

这样,启动项目后,就不会自动消费了。

手动开启和关闭消费


import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.stereotype.Service;/*** Kafka消费监听服务实现类.*/
@Service
@Slf4j
public class KafkaConsumerListenerServiceImpl implements KafkaConsumerListenerService {/*** registry.*/@Autowiredprivate KafkaListenerEndpointRegistry registry;/*** 开启监听.** @param listenerId 监听ID*/@Overridepublic void startListener(String listenerId) {//判断监听容器是否启动,未启动则将其启动if (!registry.getListenerContainer(listenerId).isRunning()) {registry.getListenerContainer(listenerId).start();}//项目启动的时候监听容器是未启动状态,而resume是恢复的意思不是启动的意思//registry.getListenerContainer(listenerId).stop();log.info(listenerId + "开启监听成功。");}/*** 停止监听.** @param listenerId 监听ID*/@Overridepublic void stopListener(String listenerId) {registry.getListenerContainer(listenerId).stop();log.info(listenerId + "停止监听成功。");}}
http://www.lryc.cn/news/242199.html

相关文章:

  • org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder
  • 系统安全测试要怎么做?
  • Golang并发模型:Goroutine 与 Channel 初探
  • 批量添加PPT备注
  • 数据挖掘之PCA-主成分分析
  • 人工智能-注意力机制之注意力汇聚:Nadaraya-Watson 核回归
  • <HarmonyOS第一课>1·运行Hello World【课后考核】
  • Ubuntu18.04安装A-Loam保姆级教程
  • 重生之我是一名程序员 40 ——字符串函数(1)
  • Navicat 技术指引 | 连接 GaussDB 主备版
  • 【git】pip install git+https://github.com/xxx/xxx替换成本地下载编译安装解决网络超时问题
  • SQL Server对象类型(6)——4.6.存储过程和函数(Procedure和Function)
  • spring @Async异步执行
  • #Js篇:单线程模式同步任务异步任务任务队列事件循环setTimeout() setInterval()
  • html table样式的设计 表格边框修饰
  • 2023年【危险化学品经营单位安全管理人员】考试内容及危险化学品经营单位安全管理人员最新解析
  • 腾讯云 小程序 SDK对象存储 COS使用记录,原生小程序写法。
  • 【uniapp】本地资源图片无法通过 WXSS 获取,可以使用网络图片,或者 base64,或者使用image标签
  • 深入了解Spring Cloud中的分布式事务解决方案
  • 安装compiler version 5
  • 关闭vscode打开的本地服务器端口
  • VUE3+Springboot实现SM2完整步骤
  • CSS-背景属性篇
  • KyLin离线安装OceanBase
  • 插件预热 | 且看安全小白如何轻松利用Goby插件快速上分
  • pytorch下载离线包的网址
  • 【docker下安装jenkins】(一)
  • 【前端】必学知识ES6 1小时学会
  • 【学生成绩管理】数据库示例数据(MySQL代码)
  • 【电子通识】什么是物料清单BOM(Bill of Material))