当前位置: 首页 > news >正文

pyflink datastream数据流ds经过一系列转换后转为table,t_env.from_data_stream(ds)

在 pyflink 处理数据流过程中,有时候需要将data_stream转为table,下面是正确的方式,即每一个算子(map,reduce, window)操作之后需要指定输出数据类型。

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import DataTypes, StreamTableEnvironment, Schemaenv = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
ds = env.from_collection([(12, "Alice"), (0, "Bob")], type_info=Types.TUPLE([Types.LONG(), Types.STRING()]))def update_tel(data):return data## 正确用法,每一步操作算子之后都需要加上输出的数据类型 output_type
ds = ds.map(lambda x: update_tel(x), output_type=Types.TUPLE([Types.INT(), Types.STRING()]))
#input_table = t_env.from_data_stream(ds).alias("score", "name")
input_table = t_env.from_data_stream(ds)
input_table.print_schema()
t_env.create_temporary_view("MyView", input_table)
t_env.from_path("MyView").print_schema()# 输出:
#(
#  `f0` INT NOT NULL,
#  `f1` STRING
#)
"""
## 错误用法,不指定output_type
ds = ds.map(lambda x: update_tel(x))
#input_table = t_env.from_data_stream(ds).alias("score", "name")
input_table = t_env.from_data_stream(ds)
input_table.print_schema()
t_env.create_temporary_view("MyView", input_table)
t_env.from_path("MyView").print_schema()输出:
(`f0` RAW('[B', '...')
)
"""

参考:
https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/python/datastream_tutorial/
https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/table/data_stream_api/
https://github.com/apache/flink/tree/release-1.16/flink-python/pyflink/examples/datastream

http://www.lryc.cn/news/486126.html

相关文章:

  • vxe-grid table 校验指定行单元格的字段,只校验某个列的字段
  • 【Java多线程】单例模式(饿汉模式和懒汉模式)
  • python 异步编程之协程
  • 现代密码学|古典密码学例题讲解|AES数学基础(GF(2^8)有限域上的运算问题)| AES加密算法
  • 算法沉淀一:双指针
  • Word_小问题解决_1
  • 基于opencv制作GUI界面
  • 微服务即时通讯系统的实现(客户端)----(2)
  • QT使用libssh2库实现sftp文件传输
  • 【Linux】进程的优先级
  • python实现十进制转换二进制,tkinter界面
  • 电子应用设计方案-12:智能窗帘系统方案设计
  • 力扣 回文链表-234
  • 采样率22050,那么CHUNK_SIZE 一次传输的音频数据大小设置多少合适?unity接收后出现卡顿的问题的思路
  • 网络初识--Java
  • K8S单节点部署及集群部署
  • GPIO相关的寄存器(重要)
  • OpenCV基础
  • 两行命令搭建深度学习环境(Docker/torch2.5.1+cu118/命令行美化+插件),含完整的 Docker 安装步骤
  • Redis做分布式锁
  • lambdaQueryWrapper详细解释
  • 【工控】线扫相机小结 第三篇
  • golang中的init函数
  • 理解和选择Vue的组件风格:组合式API与选项式API详解
  • Java基础——高级技术
  • 什么是SSL VPN?其中的协议结构是怎样的?
  • 程序员高频率面试题-整理篇
  • 第二十二章 TCP 客户端 服务器通信 - TCP设备的OPEN和USE命令关键字
  • CSS 语法规范
  • Linux开发常用命令