当前位置: 首页 > news >正文

flink消费kafka数据,按照指定时间开始消费

kafka中根据时间戳开始消费数据

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.nodes.CollectionNode;import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;/*** 支持按topic指定开始消费时间戳** @author */
public class KafkaOffsetsInitializer implements OffsetsInitializer  {private Logger logger = LoggerFactory.getLogger(KafkaOffsetsInitializer.class);private static final long serialVersionUID = 1L;/*** key:topic,value:开始消费时间戳*/private Map<String, Long> topicStartingTimestamps;private ParameterTool parameters;/*** @param topicStartingTimestamps* @param parameters*/public KafkaOffsetsInitializer(Map<String, Long> topicStartingTimestamps, ParameterTool parameters) {this.topicStartingTimestamps = topicStartingTimestamps;this.parameters = parameters;}@Overridepublic Map<TopicPartition, Long> getPartitionOffsets(Collection<TopicPartition> partitions,PartitionOffsetsRetriever partitionOffsetsRetriever) {//定义起始时间,初始offsetMap<TopicPartition, Long> startingTimestamps = new HashMap<>();Map<TopicPartition, Long> initialOffsets = new HashMap<>();//commited offsetMap<TopicPartition, Long> committedOffsets = partitionOffsetsRetriever.committedOffsets(partitions);//beginningOffsets the first offset for the given partitions.Map<TopicPartition, Long> beginningOffsets = partitionOffsetsRetriever.beginningOffsets(partitions);//endOffsets the for the given partitions.Map<TopicPartition, Long> endOffsets = partitionOffsetsRetriever.endOffsets(partitions);final long now = System.currentTimeMillis();partitions.forEach(tp -> {//起始时间赋值为从redis中获取到相对应topic的时间Long startingTimestamp = topicStartingTimestamps.get(tp.topic());if (startingTimestamp == null) {//redis里没有取到消费开始时间从启动时间消费startingTimestamp = now;logger.info("从redis没有取到时间戳,topic:{},partition:{},使用当前时间:{},{}", tp.topic(), tp.partition(), now, new Date(now));}logger.info("读取时间戳,topic:{},partition:{},时间戳:{},{}", tp.topic(), tp.partition(), now, new Date(now));startingTimestamps.put(tp, startingTimestamp);});partitionOffsetsRetriever.offsetsForTimes(startingTimestamps).forEach((tp, offsetMetadata) -> {long offsetForTime = beginningOffsets.get(tp);long offsetForCommit = beginningOffsets.get(tp);if (offsetMetadata != null) {offsetForTime = offsetMetadata.offset();logger.info("根据时间戳取到offset,topic:{},partition:{},offset:{}", tp.topic(), tp.partition(), offsetForTime);}Long commitedOffset = committedOffsets.get(tp);if (commitedOffset != null) {offsetForCommit = commitedOffset.longValue();logger.info("根据已提交offset取到offset,topic:{},partition:{},offset:{}", tp.topic(), tp.partition(), offsetForCommit);}logger.info("设置读取offset,topic:{},partition:{},offset:{},endOffset:{}", tp.topic(), tp.partition(), Math.max(offsetForTime, offsetForCommit), endOffsets.get(tp));//对比时间戳对应的offset和checkpoint保存的offset,取较大值//initialOffsets.put(tp, Math.max(offsetForTime, offsetForCommit));initialOffsets.put(tp, offsetForCommit);});return initialOffsets;}@Overridepublic OffsetResetStrategy getAutoOffsetResetStrategy() {return OffsetResetStrategy.NONE;}
}
http://www.lryc.cn/news/129540.html

相关文章:

  • 【SpringCloud】Feign使用
  • WebApIs 第五天
  • 按斤称的C++散知识
  • C++策略模式
  • 如何在网页下载腾讯视频为本地MP4格式
  • opencv-yolov8-目标检测
  • CRYPTO 密码学-笔记
  • 基于YOLOv8模型的五类动物目标检测系统(PyTorch+Pyside6+YOLOv8模型)
  • Java课题笔记~ SpringBoot基础配置
  • vue实现文件上传,前后端
  • OJ练习第151题——克隆图
  • keepalived+lvs实现高可用
  • 【Let‘s make it big】英语合集61~70
  • python实现图像的二分类
  • 8.深浅拷贝和异常处理
  • Element Plus el-table 数据为空时自定义内容【默认为 No Data】
  • 使用nginx和frp实现高效内网穿透:简单配置,畅通无阻
  • Python土力学与基础工程计算.PDF-螺旋板载荷试验
  • 低代码开发ERP:精打细算,聚焦核心投入
  • 顺序表(数据结构)
  • stable_diffusion_webui docker环境配置
  • 【Java】常见面试题:HTTP/HTTPS、Servlet、Cookie、Linux和JVM
  • 批量爬虫采集完成任务
  • intelij idea 2023 创建java web项目
  • 【论文笔记】基于指令回译的语言模型自对齐-MetaAI
  • MySQL和MariaDB的版本对应关系
  • Python数据的输入与输出
  • 生成国密密钥对
  • ASR(自动语音识别)任务中的LLM(大语言模型)
  • 简单介绍一下centos上有什么工具可以优雅的管理开机启动项