当前位置: 首页 > news >正文

kafka:消费者从指定时间的偏移开始消费(二)

我的前一篇博客《kafka:AdminClient获取指定主题的所有消费者的消费偏移(一)》为了忽略忽略掉上线之前的所有消息,从获取指定主题的所有消费者的消费偏移并计算出最大偏移来解决此问题。
但这个方案需要使用不常用的AdminClient类,而且如果该主题如果是第一次被消费者拉取消息时,因为得不到消费者的消费偏移,最后的结果,就是从0偏移开始拉取所有消息。并不能真正实现忽略上线之前所有消息的目的。
所以我又优化了方案。基本的原理就是使用KafkaConsumer.offsetsForTimes方法获取消费者的所有主题分区的指定时间的偏移,并将这个偏移作为消费开始的偏移(KafkaConsumer.seek方法) 。

	@Testpublic void test3SeekToTime() {// 配置Kafka消费者的属性Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "my_consumer_group");props.put("key.deserializer", StringDeserializer.class.getName());props.put("value.deserializer", StringDeserializer.class.getName());// 创建Kafka消费者实例try(Consumer<String, String> consumer = new KafkaConsumer<>(props)){			boolean seek = false;/** * 循环开始的时间,* 忽略该时间之前的消息*/long startMills = System.currentTimeMillis();while (true) {try {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(4000));if(!seek) {if(!records.isEmpty()) {/** * 获取第一批消息时更新消息偏移到循环开始的时间*/consumer.offsetsForTimes(Maps.asMap(consumer.assignment(),t->startMills)).forEach((k,v)->{if(null != v) {System.out.println("seek %s to %s",k,v.offset());consumer.seek(k,v.offset());}});seek = true;}/** 跳过第一批获取到的消息,继续循环 */continue;}records.forEach(record -> {String value = record.value();System.out.println("Received message: " + value);});}catch (Exception e) {e.printStackTrace();}}}}
http://www.lryc.cn/news/101983.html

相关文章:

  • Spring的加载配置文件、容器和获取bean的方式
  • (二)利用Streamlit创建第一个app——单页面、多页面
  • 一条sql查询语句在mysql中的执行过程是什么
  • 网络互联究竟是需要什么协议相同,什么协议不同?
  • ajax axios json
  • 外观模式——提供统一入口
  • Vue中导入并读取Excel数据
  • CUDA常用函数
  • 72. ElasticSearch常用命令
  • 2023.7.26(同余方程的通解与特解)
  • Diffusion扩散模型学习3——Stable Diffusion结构解析-以图像生成图像(图生图,img2img)为例
  • LangChain||什么是LangChain? LangChain有什么用?
  • 秋招算法备战第28天 | 93.复原IP地址、78.子集、90.子集II
  • Mongodb空间索引的使用以及与Django的对接
  • Windows安装MySQL数据库
  • 聊聊函数式编程中的“式”
  • ubuntu目录分析
  • Python 进阶(三):正则表达式(re 模块)
  • Vue2 第六节 key的作用与原理
  • React之组件的生命周期
  • linux -网络编程-多线程并发服务器
  • Golang之路---02 基础语法——字典
  • Pytorch(三)
  • Linux——进程控制
  • 剑指 Offer 59 - I. 滑动窗口的最大值 / LeetCode 239. 滑动窗口最大值(优先队列 / 单调队列)
  • 【Linux后端服务器开发】IP协议
  • React组件进阶之children属性,props校验与默认值以及静态属性static
  • ceph集群中RBD的性能测试、性能调优
  • texshop mac中文版-TeXShop for Mac(Latex编辑预览工具)
  • 简单认识redis高可用实现方法