当前位置: 首页 > news >正文

使用Java代码操作Kafka(五):Kafka消费 offset API,包含指定 Offset 消费以及指定时间消费

文章目录

    • 1、指定 Offset 消费
    • 2、指定时间消费


1、指定 Offset 消费

auto.offset.reset = earliest | latest | none 默认是 latest
(1)earliest:自动将偏移量重置为最早的偏移量,–from-beginning
(2)latest(默认值):自动将偏移量重置为最新偏移量。
(3)none:如果未找到消费者组的先前偏移量,则向消费者抛出异常

这个参数的力度太大了,不是从头,就是从尾
kafka提供了seek方法,可以让我们从分区的固定位置开始消费
seek(TopicPartition topicPartition,offset offset)

示例代码:

package com.bigdata.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
import java.util.Set;public class CustomConsumerSeek {public static void main(String[] args) {Properties properties = new Properties();// 连接kafkaproperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node01:9092");// 字段反序列化   key 和  valueproperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 关闭自动提交offsetproperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 配置消费者组(组名任意起名) 必须properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);// 2 订阅一个主题ArrayList<String> topics = new ArrayList<>();topics.add("first");kafkaConsumer.subscribe(topics);// 执行计划// 此时的消费计划是空的,因为没有时间生成Set<TopicPartition> assignment = kafkaConsumer.assignment();while(assignment.size() == 0){// 这个本身是拉取数据的代码,此处可以帮助快速构建分区方案出来kafkaConsumer.poll(Duration.ofSeconds(1));// 一直获取它的分区方案,什么时候有了,就什么时候跳出这个循环assignment = kafkaConsumer.assignment();}// 获取所有分区的offset =5 以后的数据/*for (TopicPartition tp:assignment) {kafkaConsumer.seek(tp,5);}*/// 获取分区0的offset =5 以后的数据//kafkaConsumer.seek(new TopicPartition("bigdata",0),5);for (TopicPartition tp:assignment) {if(tp.partition() == 0){kafkaConsumer.seek(tp,5);}}while(true){//1 秒中向kafka拉取一批数据ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String,String> record :records) {// 打印一条数据System.out.println(record);// 可以打印记录中的很多内容,比如 key  value  offset topic 等信息System.out.println(record.value());}}}
}

2、指定时间消费

示例代码:

package com.bigdata.consumer;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.*;/***  从某个特定的时间开始进行消费*/
public class Customer05 {public static void main(String[] args) {// 其实就是mapProperties properties = new Properties();// 连接kafkaproperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node01:9092");// 字段反序列化   key 和  valueproperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 配置消费者组(组名任意起名) 必须properties.put(ConsumerConfig.GROUP_ID_CONFIG, "testf");// 指定分区的分配方案  为轮询策略//properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");// 指定分区的分配策略为:Sticky(粘性)ArrayList<String> startegys = new ArrayList<>();startegys.add("org.apache.kafka.clients.consumer.StickyAssignor");properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, startegys);// 创建一个kafka消费者的对象KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);// 消费者消费的是kafka集群的数据,消费哪个主题的数据呢?List<String> topics = new ArrayList<>();topics.add("five");// list总可以设置多个主题的名称kafkaConsumer.subscribe(topics);// 因为消费者是不停的消费,所以是while true// 指定了获取分区数据的起始位置。// 这样写会报错的,因为前期消费需要指定计划,指定计划需要时间// 此时的消费计划是空的,因为没有时间生成Set<TopicPartition> assignment = kafkaConsumer.assignment();while(assignment.size() == 0){// 这个本身是拉取数据的代码,此处可以帮助快速构建分区方案出来kafkaConsumer.poll(Duration.ofSeconds(1));// 一直获取它的分区方案,什么时候有了,就什么时候跳出这个循环assignment = kafkaConsumer.assignment();}Map<TopicPartition, Long> hashMap = new HashMap<>();for (TopicPartition partition:assignment) {hashMap.put(partition,System.currentTimeMillis()- 60*60*1000);}Map<TopicPartition, OffsetAndTimestamp> map = kafkaConsumer.offsetsForTimes(hashMap);for (TopicPartition partition:assignment) {OffsetAndTimestamp offsetAndTimestamp = map.get(partition);kafkaConsumer.seek(partition,offsetAndTimestamp.offset());}while(true){// 每隔一秒钟,从kafka 集群中拉取一次数据,有可能拉取多条数据ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));// 循环打印每一条数据for (ConsumerRecord record:records) {// 打印数据中的值System.out.println(record.value());System.out.println(record.offset());// 打印一条数据System.out.println(record);}}}
}
http://www.lryc.cn/news/491827.html

相关文章:

  • Ubuntu安装不同版本的opencv,并任意切换使用
  • 突破内存限制:Mac Mini M2 服务器化实践指南
  • 【排版教程】Word、WPS 分节符(奇数页等) 自动变成 分节符(下一页) 解决办法
  • 【在Linux世界中追寻伟大的One Piece】多线程(二)
  • flink学习(8)——窗口函数
  • 「实战应用」如何用图表控件LightningChart .NET实现散点图?(一)
  • 鸿蒙Native使用Demo
  • 29.UE5蓝图的网络通讯,多人自定义事件,变量同步
  • Scala—列表(可变ListBuffer、不可变List)用法详解
  • 【论文复现】偏标记学习+图像分类
  • C嘎嘎探索篇:栈与队列的交响:C++中的结构艺术
  • AIGC-----AIGC在虚拟现实中的应用前景
  • Django 路由层
  • 《硬件架构的艺术》笔记(八):消抖技术
  • Spring 与 Spring MVC 与 Spring Boot三者之间的区别与联系
  • 【算法】连通块问题(C/C++)
  • 如何选择黑白相机和彩色相机
  • Rust 力扣 - 740. 删除并获得点数
  • OpenCV从入门到精通实战(七)——探索图像处理:自定义滤波与OpenCV卷积核
  • Docker核心概念总结
  • 环形缓冲区
  • jQuery-Word-Export 使用记录及完整修正文件下载 jquery.wordexport.js
  • 云服务器部署WebSocket项目
  • C#+数据库 实现动态权限设置
  • (原创)Android Studio新老界面UI切换及老版本下载地址
  • Ubuntu24虚拟机-gnome-boxes
  • k8s rainbond centos7/win10 -20241124
  • SpringBoot+Vue滑雪社区网站设计与实现
  • MySql.2
  • 算法之区间和题目讲解