当前位置: 首页 > article >正文

kafka学习笔记(三、消费者Consumer使用教程——从指定位置消费)

在这里插入图片描述


1.简介

Kafka的poll()方法消费无法精准的掌握其消费的起始位置,auto.offset.reset参数也只能在比较粗粒度的指定消费方式。更细粒度的消费方式kafka提供了seek()方法可以指定位移消费允许消费者从特定位置(如固定偏移量、时间戳或分区首尾)开始消费消息。

2.指定消费位置

2.1.从特定偏移量开始消费

使用seek(TopicPartition partition, long offset)指定具体偏移量。

源码分析:

  • seek()方法更新消费者内部的subscriptions对象的position字段,记录目标偏移量。
  • 后续poll()时,Fetcher类根据此位置向Broker发送拉取请求。

代码示例:

consumer.subscribe(Collections.singleton("test-topic"));
Set<TopicPartition> assignment = new HashSet<>();
// 确保分配到分区
while (assignment.isEmpty()) {consumer.poll(Duration.ofMillis(100));assignment = consumer.assignment();
}
// 设置所有分区从offset=100开始消费
assignment.forEach(tp -> consumer.seek(tp, 100));

2.2.从时间戳开始消费

使用offsetsForTimes()获取时间戳对应的偏移量,再调用seek()

源码分析:

offsetsForTimes()向Broker发送ListOffsetRequest,查询满足时间戳条件的最早或最新偏移量。

代码实例:

Map<TopicPartition, Long> timestamps = assignment.stream().collect(Collectors.toMap(tp -> tp, tp -> System.currentTimeMillis() - 24 * 3600 * 1000L));
// 获取24小时前的偏移量
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);
offsets.forEach((tp, offsetAndTs) -> {if (offsetAndTs != null) consumer.seek(tp, offsetAndTs.offset());
});

2.3.从分区首尾消费

使用seekToBeginning()seekToEnd(),或通过beginningOffsets()/endOffsets()获取首尾偏移量后手动设置。

代码实例:

// 从分区末尾开始消费(等效于auto.offset.reset=latest)
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment);
assignment.forEach(tp -> consumer.seek(tp, endOffsets.get(tp)));

2.4.注意事项

  1. 分区分配与poll()的依赖
    seek()必须在分区分配完成后调用,否则会抛出IllegalStateException。需通过循环poll()确保分配到分区。

  2. 数据过期问题
    若指定偏移量对应的消息已被删除(如日志清理导致),seek()将失效。此时需使用beginningOffsets()获取当前最小有效偏移量。

  3. 异步提交与位移覆盖风险
    异步提交(commitAsync())失败时不会重试,可能因位移回滚导致重复消费。需结合同步提交(commitSync())保证原子性

  4. seek()方法提供了我们可以将消费者位移保存在外部的能力,还可以配合在均衡监听器来提供更加精准的消费能力。

3.完整代码实例

public class SeekToTimestampDemo {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "seek-demo");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("enable.auto.commit", "false");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singleton("test-topic"));// 等待分区分配Set<TopicPartition> assignment = new HashSet<>();while (assignment.isEmpty()) {consumer.poll(Duration.ofMillis(100));assignment = consumer.assignment();}// 获取24小时前的时间戳对应偏移量Map<TopicPartition, Long> timestamps = assignment.stream().collect(Collectors.toMap(tp -> tp, tp -> System.currentTimeMillis() - 86400000L));Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);// 指定位移offsets.forEach((tp, offsetAndTs) -> {if (offsetAndTs != null) {consumer.seek(tp, offsetAndTs.offset());} else {// 处理无有效偏移量的情况(如从头开始)consumer.seekToBeginning(Collections.singleton(tp));}});while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));records.forEach(record -> System.out.printf("offset=%d, value=%s%n", record.offset(), record.value()));}}
}
http://www.lryc.cn/news/2392290.html

相关文章:

  • 【后端高阶面经:架构篇】46、分布式架构:如何应对高并发的用户请求
  • 网络编程学习笔记——TCP网络编程
  • Vue+element-ui,实现表格渲染缩略图,鼠标悬浮缩略图放大,点击缩略图播放视频(一)
  • day13 leetcode-hot100-22(链表1)
  • 【Oracle】DQL语言
  • HUAWEI华为MateBook D 14 2021款i5,i7集显非触屏(NBD-WXX9,NbD-WFH9)原装出厂Win10系统
  • 【STIP】安全Transformer推理协议
  • leetcode hot100刷题日记——27.对称二叉树
  • 高考加油(Python+HTML)
  • 贪心算法应用:Ford-Fulkerson最大流问题详解
  • UE5 Niagara 如何让四元数进行旋转
  • 从“黑箱”到透明化:MES如何重构生产执行全流程?
  • 探索Linux互斥:线程安全与资源共享
  • JWT安全:假密钥.【签名随便写实现越权绕过.】
  • Python爬虫实战:抓取百度15天天气预报数据
  • RV1126 + FFPEG多路码流项目
  • NodeJS 基于 Koa, 开发一个读取文件,并返回给客户端文件下载,以及读取文件形成列表和文件删除的代码演示
  • 为什么在我的Flask里面有两个路由,但是在网页里有一个却不能正确访问到智能体
  • 哈工大计算机系统2024大作业——Hello的程序人生
  • 2025年软件测试面试八股文(含答案+文档)
  • 【仿生系统】qwen的仿生机器人解决方案
  • Flutter3.22适配运行鸿蒙系统问题记录
  • 秋招Day10 - JVM - 内存管理
  • Spring Boot 3.5.0中文文档上线
  • Redisson学习专栏(一):快速入门及核心API实践
  • Pandas学习入门一
  • 基于Piecewise Jerk Speed Optimizer的速度规划算法(附ROS C++/Python仿真)
  • 关于 JavaScript 版本、TypeScript、Vue 的区别说明, PHP 开发者入门 Vue 的具体方案
  • 中断和信号详解
  • STM32八股【10】-----stm32启动流程