当前位置: 首页 > news >正文

kafka复习:(11)auto.offset.reset的默认值

在ConsumerConfig这个类中定义了这个属性的默认值,如下图
在这里插入图片描述
也就是默认值为latest,它的含义是:如果没有客户端提交过offset的话,当新的客户端消费时,把最新的offset设置为当前消费的offset.

默认是自动提交位移的,每5秒进行一次提交。可以通过参数配置手动提交。

手动提交offset的示例


import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;
import java.time.temporal.TemporalUnit;
import java.util.*;
import java.util.concurrent.TimeUnit;
/*
设置手动提交offset*/public class KafkaTest08 {private static Properties getProperties(){Properties properties=new Properties();properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"xx.xx.xx.xx:9092");properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"testGroup8");//设置手动提交位移properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");return properties;}public static void main(String[] args) {KafkaConsumer<String,String> myConsumer=new KafkaConsumer<String, String>(getProperties());myConsumer.subscribe(Arrays.asList("student"));int i=0;while(true){ConsumerRecords<String,String> consumerRecords=myConsumer.poll(Duration.ofMillis(5000));for(ConsumerRecord record: consumerRecords){System.out.println(record.value());System.out.println("record offset is: "+record.offset());}myConsumer.commitSync();if(i==0){//myConsumer.commitSync();i ++;}else {i ++;}}}
}
http://www.lryc.cn/news/141728.html

相关文章:

  • 【javaweb】学习日记Day7 - Mysql 数据库 DQL 多表设计
  • 线程的生命周期
  • GAN | 论文精读 Generative Adversarial Nets
  • Yolo系列-yolov2
  • Linux下的系统编程——vim/gcc编辑(二)
  • 2023年国赛 高教社杯数学建模思路 - 案例:最短时间生产计划安排
  • 芯科科技推出专为Amazon Sidewalk优化的全新片上系统和开发工具,加速Sidewalk网络采用
  • Kotlin 丰富的函数特性
  • Node.js怎么搭建HTTP服务器
  • 基于Redisson的联锁(MultiLock)
  • 人脸识别平台批量导入绑定设备的一种方法
  • MySQL—MySQL的NULL值是怎么存放的
  • sql server删除历史数据
  • 目标检测项目中,使用python+xml.etree.ElementTree修改xml格式标注文件中的类别名称
  • 最新域名和子域名信息收集技术
  • C语言基础之——指针(上)
  • 构建 NodeJS 影院预订微服务并使用 docker 部署(04/4)
  • SpringBootWeb案例 Part3
  • C++中using 用法
  • window下jdk安装及更换jdk版本的一些问题。
  • GPT4模型架构的泄漏与分析
  • GEE/PIE遥感大数据处理与典型案例丨数据整合Reduce、云端数据可视化、数据导入导出及资产管理、机器学习算法等
  • STM32--DMA
  • mongodb和redis的用途
  • 【动手学深度学习】--18.图像增广
  • 数据分析--统计学知识
  • matlab 计算点云协方差矩阵
  • python进阶之图像编程 pillow扩展库
  • TiCDC Canal-JSON 消息接收示例(Java 版)
  • SQLite、MySQL、PostgreSQL3个关系数据库之间的对比