当前位置: 首页 > news >正文

Flink系列之:动态发现新增分区

Flink系列之:动态发现新增分区

  • 一、动态发现新增分区
  • 二、Flink SQL动态发现新增分区
  • 三、Flink API动态发现新增分区

为了在不重新启动 Flink 作业的情况下处理主题扩展或主题创建等场景,可以将 Kafka 源配置为在提供的主题分区订阅模式下定期发现新分区。要启用分区发现,请为属性partition.discovery.interval.ms设置一个非负值。

一、动态发现新增分区

flink程序增加自动发现分区参数:

  • flink.partition-discovery.interval-millis是一个配置属性,用于设置Flink作业中的分区发现间隔时间(以毫秒为单位)。
  • 在Flink作业中,数据源(例如Kafka或文件系统)的分区可能会发生变化。为了及时感知分区的变化情况,并根据变化进行相应的处理,Flink提供了分区发现机制。
  • flink.partition-discovery.interval-millis配置属性用于设置Flink作业在进行分区发现时的间隔时间。Flink作业会定期检查数据源的分区情况,如果发现分区发生了变化(例如增加或减少了分区),Flink会相应地调整作业的并行度或重新分配任务来适应新的分区情况。
  • 通过调整flink.partition-discovery.interval-millis的值,可以控制Flink作业进行分区发现的频率。较小的间隔时间可以实时感知到分区变化,但可能会增加作业的开销;较大的间隔时间可以减少开销,但可能导致较长时间的延迟。
  • 需要注意的是,flink.partition-discovery.interval-millis的默认值是5分钟(300000毫秒),可以根据具体需求进行调整。

二、Flink SQL动态发现新增分区

参数:scan.topic-partition-discovery.interval

CREATE TABLE KafkaTable (`event_time` TIMESTAMP(3) METADATA FROM 'timestamp',`partition` BIGINT METADATA VIRTUAL,`offset` BIGINT METADATA VIRTUAL,`user_id` BIGINT,`item_id` BIGINT,`behavior` STRING
) WITH ('connector' = 'kafka','topic' = 'user_behavior','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'testGroup','scan.startup.mode' = 'earliest-offset','format' = 'csv'
);

Connector Options:

OptionRequiredDefaultTypeDescription
scan.topic-partition-discovery.intervaloptional(none)Duration消费者定期发现动态创建的Kafka主题和分区的时间间隔。

三、Flink API动态发现新增分区

参数:partition.discovery.interval.ms

Java

KafkaSource.builder()    
.setProperty("partition.discovery.interval.ms", "10000"); 
// discover new partitions per 10 seconds

Python

KafkaSource.builder() \.set_property("partition.discovery.interval.ms", "10000")  # discover new partitions per 10 seconds
http://www.lryc.cn/news/106319.html

相关文章:

  • eclipse版本与jdk版本对应关系
  • File类的学习
  • Linux 操作系统 Red Hat Enterprise Linux 安装教程
  • 关于拓扑排序
  • 【C++】开源:Boost库常用组件配置使用
  • 用python通过http实现文件传输,分为发送端和接收端
  • 数据结构--图的遍历 DFS
  • SpringBoot集成MyBatisPlus+MySQL(超详细)
  • 一边是计算机就业哀鸿遍野,一边是高考生疯狂涌向计算机专业
  • 解决外部主机无法访问Docker容器的方法
  • IDEA中修改类头的文档注释信息
  • 建模教程:如何利用3ds Max 和 After Effects 实现多通道渲染和后期合成
  • JPA之Hibernate
  • leetcode(力扣)剑指 Offer 16. 数值的整数次方 (快速幂)
  • git命令分类合集
  • 微信小程序打开地图的方法
  • 快手头部主播合体,二驴祁天道直播首秀销售额破亿
  • Golang Devops项目开发(1)
  • Django系列之DRF简单使用
  • 新闻标题文本分类任务
  • 自己实现MyBatis 底层机制--抽丝剥茧(上)
  • Django后端执行成功或失败状态码
  • Prometheus中的关键设计
  • Centos7 安装yum
  • 无涯教程-Lua - 简介
  • 【第一阶段】kotlin语言引用数据类型
  • BUU [网鼎杯 2020 朱雀组]phpweb
  • 使用WebMvcConfigurationSupport后导致原来返回的json数据变为了xml的解决方法
  • 如何判断一个枚举值是否存在(Check if an Enum Value Exists in Java)
  • 网工内推 | 网络安全工程师,最高15K,有高温补贴