当前位置: 首页 > news >正文

【flink sql table api】时间属性的指定与使用注意事项

文章目录

  • 一. 时间属性介绍
  • 二. Table api指定时间属性
  • 三. 处理时间的指定
    • 1. 在创建表的 DDL 中定义
    • 2. 在 DataStream 到 Table 转换时定义
    • 3. 使用 TableSource 定义
  • 四. 事件时间的指定
    • 1. 在 DDL 中定义
    • 2. 在 DataStream 到 Table 转换时定义
    • 3. 使用 TableSource 定义
  • 五. 小结

Flink 可以基于几种不同的 时间 概念来处理数据。

  • 处理时间 指的是执行具体操作时的机器时间(大家熟知的绝对时间, 例如 Java的 System.currentTimeMillis()) )
  • 事件时间 指的是数据本身携带的时间。这个时间是在事件产生时的时间。
  • 摄入时间 指的是数据进入 Flink 的时间;在系统内部,会把它当做事件时间来处理。

本页面说明了如何在 Flink Table API & SQL 里面定义时间以及相关的操作。
 

一. 时间属性介绍

像窗口(在 Table API 和 SQL )这种基于时间的操作,需要有时间信息。

时间属性声明

  • 在CREATE TABLE DDL创建表的时候指定
  • 在 DataStream 中指定
  • 在定义 TableSource 时指定

一旦时间属性定义好,就可以像普通列一样使用,也可以在时间相关的操作中使用。

 
时间属性的传递和物化

  • 只要时间属性没有被修改,而是简单地从一个表传递到另一个表,它就仍然是一个有效的时间属性。
  • 时间属性可以像普通的时间戳的列一样被使用和计算。一旦时间属性被用在了计算中,它就会被物化,进而变成一个普通的时间戳。

注意:

普通的时间戳是无法跟 Flink 的时间以及watermark等一起使用的,所以普通的时间戳就无法用在时间相关的操作中。

 

二. Table api指定时间属性

Table API 程序需要在 streaming environment 中指定时间属性:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default// 或者:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

 

三. 处理时间的指定

处理时间是基于机器的本地时间来处理数据,它是最简单的一种时间概念,但是它不能提供确定性它既不需要从数据里获取时间,也不需要生成 watermark。

共有三种方法可以定义处理时间。

1. 在创建表的 DDL 中定义

处理时间属性可以在创建表的 DDL 中用计算列的方式定义,用 PROCTIME() 就可以定义处理时间,函数 PROCTIME() 的返回类型是 TIMESTAMP_LTZ 。

CREATE TABLE user_actions (user_name STRING,data STRING,user_action_time AS PROCTIME() -- 声明一个额外的列作为处理时间属性
) WITH (...
);SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

 

2. 在 DataStream 到 Table 转换时定义

ing
 

3. 使用 TableSource 定义

ing
 

四. 事件时间的指定

事件时间允许程序按照数据中包含的时间来处理,这样可以在数据乱序或者晚到情况下产生一致的处理结果。
它可以保证从外部存储读取数据后产生可以复现(replayable)的结果。

为了能够处理乱序的事件,并且区分正常到达和晚到的事件,Flink 需要从事件中获取事件时间并且产生 watermark(watermarks)。

 
同样事件时间的指定也有三种方式

1. 在 DDL 中定义

事件时间属性可以用 WATERMARK 语句在 CREATE TABLE DDL 中进行定义。

WATERMARK 语句在一个已有字段上定义一个 watermark 生成表达式,同时标记这个已有字段为时间属性字段

Flink 支持和在 TIMESTAMP(不带时区) 列和 TIMESTAMP_LTZ(带有本地时区) 列上定义事件时间。

如果源数据中的时间戳数据表示为年-月-日-时-分-秒,则通常为不带时区信息的字符串值,例如 2020-04-15 20:13:40.564,建议将事件时间属性定义在 TIMESTAMP(不带时区) 列上:

CREATE TABLE user_actions (user_name STRING,data STRING,user_action_time TIMESTAMP(3),-- 声明 user_action_time 是事件时间属性,并且用 延迟 5 秒的策略来生成 watermarkWATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (...
);SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

 

当源数据中的时间戳数据表示为一个纪元 (epoch) 时间,通常是一个 long 值,例如 1618989564564,此时建议将事件时间属性定义在 TIMESTAMP_LTZ 列上:

CREATE TABLE user_actions (user_name STRING,data STRING,ts BIGINT,time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),-- declare time_ltz as event time attribute and use 5 seconds delayed watermark strategyWATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND
) WITH (...
);SELECT TUMBLE_START(time_ltz, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE);

Epoch Time 是一种计算机系统中常用的时间表示方法,它以秒为单位从一个特定时间点(通常是1970年1月1日午夜UTC)开始计算时间,用于在计算机系统中跟踪和比较时间戳。

 

2. 在 DataStream 到 Table 转换时定义

ing

3. 使用 TableSource 定义

ing
 

五. 小结

本文讨论了flink sql中时间属性的指定方法,其中有几点细节:

  1. 普通的时间戳无法用在时间相关的操作中,需要进行时间属性的定义
  2. 通过PROCTIME()或WATERMARK关键字可以在create语句中分别定义处理时间和事件时间类型的时间属性
  3. 时间属性定义好后,就可以像普通列一样使用,也可以在时间相关的操作中使用
  4. 一旦时间属性被用在了计算中,它就会被物化,进而变成一个普通的时间戳。也就无法进行时间相关操作。

 
参考:
https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/table/concepts/time_attributes/#%E5%9C%A8-ddl-%E4%B8%AD%E5%AE%9A%E4%B9%89

http://www.lryc.cn/news/210818.html

相关文章:

  • 评价模型:CRITIC客观赋权法
  • 两个Tomcat插件配置不同端口,session冲突,同时登录被挤下线问题的解决
  • Mybatis中执行Sql的执行过程
  • IEEE Standard for SystemVerilog—Chapter 25.7 Tasks and functions in interfaces
  • 一台服务器最大能支持多少条 TCP 连接
  • Qt重定向QDebug,Qt/C++开源作品39-日志输出增强版V2022
  • linux入门---多线程的控制
  • 基于android的 rk3399 同时支持多个USB摄像头
  • 【Qt之控件QTreeView】设置单元格高度、设置图标尺寸
  • 力扣42.接雨水(java,暴力法、前缀和解法)
  • hdlbits系列verilog解答(移位寄存器)-23
  • Linux命令记载
  • Flume 快速入门【概述、安装、拦截器】
  • 【pandas技巧】group by+agg+transform函数
  • 一文解读WordPress网站的各类缓存-老白博客
  • 从零开始:开发直播商城APP的技术指南
  • GZ035 5G组网与运维赛题第6套
  • 分类预测 | Matlab实现KOA-CNN-GRU-selfAttention多特征分类预测(自注意力机制)
  • 【Qt】QString怎么转成int
  • ubuntu 22.04 安装python-pcl
  • 【题解】[GenshinOI Round 3 ]P9817 lmxcslD
  • 在pycharm中,远程操作服务器上的jupyter notebook
  • SQL 运算符
  • 中间件安全-CVE 复现K8sDockerJettyWebsphere漏洞复现
  • 系列九、什么是Spring bean
  • 轻量封装WebGPU渲染系统示例<4>-CubeMap/天空盒(源码)
  • Linux 环境变量 二
  • Beyond Compare4 30天试用到期的解决办法
  • sentinel规则持久化-规则同步nacos-最标准配置
  • 【Linux】tail命令使用