当前位置: 首页 > news >正文

flink使用StatementSet降低资源浪费

背景

项目中有很多ods层(mysql 通过cannal)kafka,需要对这些ods kakfa做一些etl操作后写入下一层的kafka(dwd层)。

一开始采用的是executeSql方式来执行每个ods→dwd层操作,即类似:

 def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)val configuration: Configuration = tableEnv.getConfig.getConfigurationtableEnv.createTemporarySystemFunction("etl_handle", classOf[ETLFunction])// source/sink ddltableEnv.executeSql(CREATE_DB_DDL)tableEnv.executeSql(SOURCE_KAFKA_ODS_TABLE1)tableEnv.executeSql(SINK_KAFKA_DWD_TABLE1)tableEnv.executeSql(SOURCE_KAFKA_ODS_TABLE2)tableEnv.executeSql(SINK_KAFKA_DWD_TABLE2)....// insert dml,在insert语句中调用etl_handle进行预处理和写入tableEnv.executeSql(INSERT_DWD_TABLE1)tableEnv.executeSql(INSERT_DWD_TABLE2)... 
}

当有多个ods->dwd操作放在同一个flink作业中时,发现这种方式会导致每次insert操作都是单独的DAG,非常消耗资源,特别是这些处理都是比较轻量级的,最好是能融合在同一个DAG中共享资源。

解决方案

查看flink文档:INSERT 语句 | Apache Flink

因此,可以采用statementset的方式,将不同insert sql进行分组执行,每组的insert sql会先被缓存到 StatementSet 中,并在StatementSet.execute() 方法被调用时,同一组的 insert sql(sink) 会被优化成一张DAG共用taskmanager,减少资源浪费,即类似:

def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)val configuration: Configuration = tableEnv.getConfig.getConfigurationtableEnv.createTemporarySystemFunction("etl_handle", classOf[ETLFunction])// source/sink ddltableEnv.executeSql(CREATE_DB_DDL)tableEnv.executeSql(SOURCE_KAFKA_ODS_TABLE1)tableEnv.executeSql(SINK_KAFKA_DWD_TABLE1)tableEnv.executeSql(SOURCE_KAFKA_ODS_TABLE2)tableEnv.executeSql(SINK_KAFKA_DWD_TABLE2)....// insert dmltableEnv.createStatementSet().addInsertSql(INSERT_DWD_TABLE1).addInsertSql(INSERT_DWD_TABLE2).addInsertSql(INSERT_DWD_TABLE3).execute()tableEnv.createStatementSet().addInsertSql(INSERT_DWD_TABLE4).addInsertSql(INSERT_DWD_TABLE5).addInsertSql(INSERT_DWD_TABLE6).execute()
}

其他

如果是纯flink sql而不用data stream api,也是可以达到同样的效果的。

http://www.lryc.cn/news/390208.html

相关文章:

  • FineDataLink4.1.9支持Kettle调用
  • SwanLinkOS首批实现与HarmonyOS NEXT互联互通,软通动力子公司鸿湖万联助力鸿蒙生态统一互联
  • Win11禁止右键菜单折叠的方法
  • Maven列出所有的依赖树
  • 测试开发面试题和答案
  • llm学习-3(向量数据库的使用)
  • 【01-02】Mybatis的配置文件与基于XML的使用
  • Linux-进程间通信(IPC)
  • C++ STL: std::vector与std::array的深入对比
  • 哈哈看到这条消息感觉就像是打开了窗户
  • 10、matlab中字符、数字、矩阵、字符串和元胞合并为字符串并将字符串以不同格式写入读出excel
  • 如何正确面对GPT-5技术突破
  • HarmonyOS ArkUi 官网踩坑:单独隐藏导航条无效
  • 解决跨域问题(vite、axios/koa)
  • echarts实现3D柱状图(视觉层面)
  • K8S集群进行分布式负载测试
  • 20.《C语言》——【移位操作符】
  • 你想活出怎样的人生?
  • py黑帽子学习笔记_burp
  • selenium,在元素块下查找条件元素
  • 认识String类
  • 计算机图形学入门23:蒙特卡洛路径追踪
  • 探索 TensorFlow 模型的秘密:TensorBoard 详解与实战
  • yolov8obb角度预测原理解析
  • CICD之Git版本管理及基本应用
  • Python作用域及其应用
  • 谷歌上架,应用被Google play下架之后,活跃用户会暴跌?这是为什么?
  • web安全渗透测试十大常规项(一):web渗透测试之Fastjson反序列化
  • Unity 3D软件下载安装;Unity 3D游戏制作软件资源包获取!
  • PyTorch之nn.Module与nn.functional用法区别