当前位置: 首页 > news >正文

flink table view datastream互转

case class outer(f1:String,f2:Inner)
case class outerV1(f1:String,f2:Inner,f3:Int)
case class Inner(f3:String,f4:Int)

测试代码

package com.yy.table.convertimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.types.DataTypeobject streamPOJO2table {case class outer(f1:String,f2:Inner)case class outerV1(f1:String,f2:Inner,f3:Int)case class Inner(f3:String,f4:Int)def main(args: Array[String]): Unit = {// flink1.13 流处理环境初始化val env = StreamExecutionEnvironment.getExecutionEnvironmentval tEnv = StreamTableEnvironment.create(env)import org.apache.flink.streaming.api.scala._val ds1: DataStream[outer] = env.fromElements(outer("a",Inner("b",2)),outer("d",Inner("e",4)))val table1: Table = tEnv.fromDataStream(ds1)
//    table1
//      .execute()
//      .print()/*+----+--------------------------------+--------------------------------+
| op |                             f1 |                             f2 |
+----+--------------------------------+--------------------------------+
| +I |                              a |                   (f3=b, f4=2) |
| +I |                              d |                   (f3=e, f4=4) |
+----+--------------------------------+--------------------------------+*///    table1
//      .print()/*5> +I[d, Inner(e,4)]
4> +I[a, Inner(b,2)]*/tEnv.createTemporaryView("view1", ds1)val tableResult1: TableResult = tEnv.executeSql("select f1,f2,(f2.f4 + 100) as f3 from view1")tableResult1.print()/*+----+--------------------------------+--------------------------------+-------------+
| op |                             f1 |                             f2 |          f3 |
+----+--------------------------------+--------------------------------+-------------+
| +I |                              a |                   (f3=b, f4=2) |         102 |
| +I |                              d |                   (f3=e, f4=4) |         104 |
+----+--------------------------------+--------------------------------+-------------+*///val t1: Table = tEnv.sqlQuery("select f1,f2,(f2.f4 + 100) as f3 from view1")
//    t1.print()//    println(t1.getResolvedSchema)/*
+----+--------------------------------+--------------------------------+-------------+
| op |                             f1 |                             f2 |          f3 |
+----+--------------------------------+--------------------------------+-------------+
| +I |                              a |                   (f3=b, f4=2) |         102 |
| +I |                              d |                   (f3=e, f4=4) |         104 |
+----+--------------------------------+--------------------------------+-------------+
2 rows in set
(`f1` STRING,`f2` *com.yy.table.convert.streamPOJO2table$Inner<`f3` STRING, `f4` INT NOT NULL>* NOT NULL,`f3` INT NOT NULL
)*/println("---- 1 -------")// tableResult转datastreamval o1: DataStream[outerV1] = tEnv.toDataStream[outerV1](t1,classOf[outerV1])
//    o1.print()println("---- 2 -------")tEnv.executeSql("""|select|f1|,f2.f3|,f2.f4|from view1|""".stripMargin)
//      .print()/*+----+--------------------------------+--------------------------------+--------------------------------+
| op |                             f1 |                             f3 |                             f4 |
+----+--------------------------------+--------------------------------+--------------------------------+
| +I |                              a |                              b |                              c |
| +I |                              d |                              e |                              f |
+----+--------------------------------+--------------------------------+--------------------------------+*/tEnv.executeSql("""|select|f1|,(f2.f3,f2.f4)|from view1|""".stripMargin)
//      .print()env.execute("jobName1")}}

http://www.lryc.cn/news/272973.html

相关文章:

  • redis重启后数据丢失问题解决(亲测好用)
  • 敬请期待……
  • 3.10 Android eBPF HelloWorld调试(四)
  • PyTorch常用工具(1)数据处理
  • docker-简单说说cgroup
  • 印象笔记04: 如何将印象笔记超级会员价值最大化利用?
  • 我的JDK动态代理流程
  • uniapp Vue3 面包屑导航 带动态样式
  • openGauss学习笔记-174 openGauss 数据库运维-备份与恢复-导入数据-管理并发写入操作
  • 数据分析可被划分为4个重要的类别
  • 爆火小游戏敲木鱼流量主小程序源码系统+完整的代码包以及安装搭建教程
  • Invoke和BeginInvoke的区别
  • 3 分钟为英语学习神器 Anki 部署一个专属同步服务器
  • <HarmonyOS第一课>应用程序框架
  • SQL 解析 — 如何轻松实现新增语句
  • Android集成OpenSSL实现加解密-集成
  • 代码随想录算法训练营Day18|513.找树左下角的值、112. 路径总和、113. 路径总和ii、106.从中序与后序遍历序列构造二叉树
  • 【蓝桥备赛】技能升级——二分查找
  • zyqn-arm软中断设置
  • k8s---pod基础下
  • 玩转朋友圈!这样运营朋友圈吸睛又吸金!
  • react学习
  • vue-cli项目中vue.config.js的配置
  • Github 2024-01-04 开源项目日报 Top10
  • 使用GPTs+Actions自动获取第三方数据
  • git提交操作(不包含初始化仓库)
  • 使用YOLOv8和Grad-CAM技术生成图像热图
  • Vue: 多个el-select不能重复选择相同属性
  • 金色麦芒的2023
  • java设计模式学习之【策略模式】