当前位置: 首页 > news >正文

Kafka3.0.0版本——消费者(自动提交 offset)

目录

    • 一、自动提交offset的相关参数
    • 二、消费者(自动提交 offset)代码示例

一、自动提交offset的相关参数

  • 官网文档
    在这里插入图片描述

  • 参数解释

    参数描述
    enable.auto.commi默认值为 true,消费者会自动周期性地向服务器提交偏移量。
    auto.commit.interval.ms如果设置了 enable.auto.commit 的值为 true, 则该值定义了消费者偏移量向 Kafka 提交的频率,默认 5s。
  • 图解分析

    在这里插入图片描述

二、消费者(自动提交 offset)代码示例

  • 消费者自动提交 offset代码

    // 自动提交
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
    // 提交时间间隔 1秒
    properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
    
  • 消费者自动提交 offset代码完整代码

    package com.xz.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Properties;public class CustomConsumerAutoOffset {public static void main(String[] args) {// 配置Properties properties = new Properties();// 连接 bootstrap.serversproperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");// 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 配置消费者组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"test3");// 自动提交properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);// 提交时间间隔 1秒properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);// 1 创建一个消费者  "", "hello"KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);// 2 订阅主题 firstArrayList<String> topics = new ArrayList<>();topics.add("sevenTopic");kafkaConsumer.subscribe(topics);// 3 消费数据while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
    }
    
http://www.lryc.cn/news/168341.html

相关文章:

  • 【业务功能116】微服务-springcloud-springboot-Kubernetes集群-k8s集群-KubeSphere-公共服务 DNS
  • 马斯洛的动机与人格、需求层次理论
  • TCP/IP网络传输模型及协议
  • git 推送出现fatal: The remote end hung up unexpectedly解决方案
  • Hive内置函数字典
  • svg 知识点总结
  • 开源库源码分析:OkHttp源码分析(二)
  • 校园地理信息系统的设计与实现
  • Vulnhub实战-prime1
  • Scala学习笔记
  • 虹科分享 | 软件供应链攻击如何工作?如何评估软件供应链安全?
  • gRpc入门和springboot整合
  • 基于FPGA点阵显示屏设计-毕设
  • Rocky9.2基于http方式搭建局域网yum源
  • Android 串口通讯
  • 论如何在Android中还原设计稿中的阴影
  • Hadoop生态圈中的Flume数据日志采集工具
  • FFmpeg获取媒体文件的视频信息
  • io概述及其分类
  • 前端面试话术集锦第 14 篇:高频考点(React常考基础知识点)
  • UI/UX+前端架构:设计和开发高质量的用户界面和用户体验
  • 长尾关键词挖掘软件-免费的百度搜索关键词挖掘
  • React Native 环境配置(mac)
  • CAD for JS:VectorDraw web library 10.1004.1 Crack
  • 代码管理工具git1
  • 层次聚类分析
  • Jmeter性能实战之分布式压测
  • 学信息系统项目管理师第4版系列08_管理科学基础
  • 从2023蓝帽杯0解题heapSpary入门堆喷
  • 基于SSM的学生宿舍管理系统设计与实现