当前位置: 首页 > news >正文

Flink Table API/SQL 多分支sink

背景

在某个场景中,需要从Kafka中获取数据,经过转换处理后,需要同时sink到多个输出源中(kafka、mysql、hologres)等。两次调用execute, 阿里云Flink vvr引擎报错:

public static void main(String[] args) {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);StreamStatementSet streamStatementSet = tEnv.createStatementSet();String s = LocalDateTimeUtils.getDateTime(System.currentTimeMillis());DataStream<String> dataStream = env.fromElements(s, LocalDateTimeUtils.getDateTime(System.currentTimeMillis()));tEnv.executeSql(KAFKA_TABLE_SQL);tEnv.executeSql(KAFKA_TABLE_SQL_1);Table table = tEnv.fromDataStream(dataStream);table.insertInto("kafka_sink").execute();table.insertInto("kafka_sink_1").execute();streamStatementSet.execute();}
Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more than one execute() or executeAsync() call in a single environment.at org.apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution(StreamContextEnvironment.java:199) ~[flink-dist-1.15-vvr-6.0.7-1-SNAPSHOT.jar:1.15-vvr-6.0.7-1-SNAPSHOT]at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:187) ~[flink-dist-1.15-vvr-6.0.7-1-SNAPSHOT.jar:1.15-vvr-6.0.7-1-SNAPSHOT]at org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:110) ~[?:?]at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:877) ~[flink-table-api-java-uber-1.15-vvr-6.0.7-1-SNAPSHOT.jar:1.15-vvr-6.0.7-1-SNAPSHOT]at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:756) ~[flink-table-api-java-uber-1.15-vvr-6.0.7-1-SNAPSHOT.jar:1.15-vvr-6.0.7-1-SNAPSHOT]at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:955) ~[flink-table-api-java-uber-1.15-vvr-6.0.7-1-SNAPSHOT.jar:1.15-vvr-6.0.7-1-SNAPSHOT]at org.apache.flink.table.api.internal.TablePipelineImpl.execute(TablePipelineImpl.java:57) ~[flink-table-api-java-uber-1.15-vvr-6.0.7-1-SNAPSHOT.jar:1.15-vvr-6.0.7-1-SNAPSHOT]

解决

使用 StreamStatementSet. 具体参考官网:
https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/table/data_stream_api/#converting-between-datastream-and-table

改良后的代码:

public static void main(String[] args) {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);StreamStatementSet streamStatementSet = tEnv.createStatementSet();String s = LocalDateTimeUtils.getDateTime(System.currentTimeMillis());DataStream<String> dataStream = env.fromElements(s, LocalDateTimeUtils.getDateTime(System.currentTimeMillis()));tEnv.executeSql(KAFKA_TABLE_SQL);tEnv.executeSql(KAFKA_TABLE_SQL_1);Table table = tEnv.fromDataStream(dataStream);streamStatementSet.addInsert("kafka_sink", table);streamStatementSet.addInsert("kafka_sink_1", table);streamStatementSet.execute();}
http://www.lryc.cn/news/135289.html

相关文章:

  • Vue3 中 导航守卫 的使用
  • 云原生概论
  • hive-sql
  • Rspack 创建 vue2/3 项目接入 antdv(rspack.config.js 配置 less 主题)
  • 基于centos7完成docker服务的一些基础操作
  • Microsoft Visual Studio + Qt插件编程出现错误error MSB4184问题
  • QT Quick之quick与C++混合编程
  • Ros noetic Move_base 相关状态位置的获取 实战使用教程
  • 【SpringBoot】SpringBoot项目与Vue对接接口的步骤
  • Glog安装与使用
  • windows开发环境搭建
  • 8月17日上课内容 第三章 LVS+Keepalived群集
  • Threejs学习05——球缓冲几何体背景贴图和环境贴图
  • LVS+Keepalived群集实验
  • 软考高级之系统架构师之系统开发基础
  • Web 3.0 安全风险,您需要了解这些内容
  • 万宾科技22款产品入选《城市生命线安全工程监测技术产品名录》
  • MFC 隐藏窗口
  • Java数据库连接池原理及spring boot使用数据库连接池(HikariCP、Druid)
  • 百度商业AI 技术创新大赛赛道二:AIGC推理性能优化TOP10之经验分享
  • 微服务时代java异常捕捉
  • Hadoop支持LZO压缩
  • vue3 01-setup函数
  • iOS swift 类似AirDrop的近场数据传输 MultipeerConnectivity 框架
  • Lnton羚通云算力平台OpenCV-PythonCanny边缘检测教程
  • 2023-8-23 滑动窗口
  • SOA通信中间件常用的通信协议
  • 解决npm安装依赖失败,node和node-sass版本不匹配的问题
  • 2023 网络建设与运维 X86架构计算机操作系统安装与管理题解
  • LAMP 架构及Discuz论坛与Wordpress博客搭建