当前位置: 首页 > news >正文

0基础学习PyFlink——不可以用UDTAF装饰器装饰function的原因分析

在研究Flink的“用户自定义方法”(UserDefinedFunction)时,我们看到存在如下几种类型的装饰器:

  1. UDF:User Defined Scalar Function
  2. UDTF:User Defined Table Function
  3. UDAF:User Defined Aggregate Function
  4. UDTAF:User Defined Table Aggregate Function

在很多案例中,我们看到udf、udtf和udaf几个装饰器修饰function

@udf(result_type=DataTypes.BIGINT())
def add(i, j):return i + j@udtf(result_types=[DataTypes.BIGINT(), DataTypes.BIGINT()])
def range_emit(s, e):for i in range(e):yield s, i@udaf(result_type=DataTypes.FLOAT(), func_type="pandas")
def mean_udaf(v):return v.mean()

但是没有见到udtaf修饰function的案例,比如

# 错误的
@udtaf(result_type=DataTypes.ROW([DataTypes.FIELD("word", DataTypes.STRING()) , DataTypes.FIELD("count", DataTypes.BIGINT())]), accumulator_type=DataTypes.ROW([DataTypes.FIELD("word", DataTypes.STRING())]), func_type="general")
def lower(line):yield Row('a', 1)

这是因为这儿存在一个悖论

udtaf要求func_type必须是general

def udtaf(f: Union[Callable, TableAggregateFunction, Type] = None,input_types: Union[List[DataType], DataType, str, List[str]] = None,result_type: Union[DataType, str] = None,accumulator_type: Union[DataType, str] = None,deterministic: bool = None, name: str = None,func_type: str = 'general') -> Union[UserDefinedAggregateFunctionWrapper, Callable]:"""Helper method for creating a user-defined table aggregate function.:param f: user-defined table aggregate function.:param input_types: optional, the input data types.:param result_type: the result data type.:param accumulator_type: optional, the accumulator data type.:param deterministic: the determinism of the function's results. True if and only if a call tothis function is guaranteed to always return the same result given thesame parameters. (default True):param name: the function name.:param func_type: the type of the python function, available value: general(default: general):return: UserDefinedAggregateFunctionWrapper or function... versionadded:: 1.13.0"""if func_type != 'general':raise ValueError("The func_type must be 'general', got %s."% func_type)if f is None:return functools.partial(_create_udtaf, input_types=input_types, result_type=result_type,accumulator_type=accumulator_type, func_type=func_type,deterministic=deterministic, name=name)else:return _create_udtaf(f, input_types, result_type, accumulator_type, func_type,deterministic, name)

如果func_type不是’general’,则会抛出错误,所以func_type="pandas"是不可以的。
udtaf修饰方法后的返回类型是UserDefinedAggregateFunctionWrapper。

def _create_udtaf(f, input_types, result_type, accumulator_type, func_type, deterministic, name):return UserDefinedAggregateFunctionWrapper(f, input_types, result_type, accumulator_type, func_type, deterministic, name, True)

delegate function要求非func_type必须是pandas

Table API下只有这些方法接受udtaf修饰function返回的UserDefinedAggregateFunctionWrapper。

  • def aggregate(self, func: Union[Expression, UserDefinedAggregateFunctionWrapper]) -> ‘AggregatedTable’
  • def flat_aggregate(self, func: Union[Expression, UserDefinedAggregateFunctionWrapper]) -> ‘FlatAggregateTable’

这些方法的在底层会调用被修饰的UserDefinedFunctionWrapper。

    def aggregate(self, func: Union[Expression, UserDefinedAggregateFunctionWrapper]) \-> 'AggregatedTable':"""Performs a global aggregate operation with an aggregate function. You have to close theaggregate with a select statement... versionadded:: 1.13.0"""if isinstance(func, Expression):return AggregatedTable(self._j_table.aggregate(func._j_expr), self._t_env)else:func._set_takes_row_as_input()if hasattr(func, "_alias_names"):alias_names = getattr(func, "_alias_names")func = func(with_columns(col("*"))).alias(*alias_names)else:func = func(with_columns(col("*")))return AggregatedTable(self._j_table.aggregate(func._j_expr), self._t_env)

进而会调用到_java_user_defined_function。由于udtaf修饰的方法不是UserDefinedFunction对象,而是一个function,所以它会通过_create_delegate_function创建新的func 。

class UserDefinedFunctionWrapper(object):
……def _java_user_defined_function(self):……if not isinstance(self._func, UserDefinedFunction):func = self._create_delegate_function()……

而_create_delegate_function则要求udtaf中的function的func_type必须是pandas

    def _create_delegate_function(self) -> UserDefinedFunction:assert self._func_type == 'pandas'return DelegatingPandasAggregateFunction(self._func)

这就和之前udtaf中要求func_type必须是general相背。
所以我们没看到udtaf修饰function的案例。

http://www.lryc.cn/news/206553.html

相关文章:

  • Spring Boot Endpoints:端点
  • 漏洞复现--用友 畅捷通T+ .net反序列化RCE
  • PHP 共享茶室棋牌室无人软硬件结合开发小程序系统的开发优势
  • kibana监控
  • 基于 ARM+FPGA+AD平台的多类型同步信号采集仪开发及试验验证(二)板卡总体设计
  • uniapp: 本应用使用HBuilderX x.x.xx 或对应的cli版本编译,而手机端SDK版本是 x.x.xx。不匹配的版本可能造成应用异常。
  • sqoop和flume简单安装配置使用
  • 什么是React Router?它的作用是什么?
  • 界面控件DevExtreme v23.1 - UI组件 UI模板库增强
  • Fedora Linux 38下Mariadb数据库设置utf8mb4字符编码
  • 【单元测试】--高级主题
  • 面向对象程序设计(2023年10月)
  • 常用正在表达式
  • ES6初步了解Map对象(含十种方法)
  • 推荐一款可以识别m3u8格式ts流批量下载并且合成mp4视频的chrome插件——猫抓
  • 文本处理方法及其在NLP中的应用
  • html文字一行时靠右,多行时靠左
  • Stable-diffusion-webui
  • Python中的文件操作和异常处理
  • KF-GINS 和 OB-GINS 的 Earth类 和 Rotation 类
  • 2017年亚太杯APMCM数学建模大赛B题喷雾轨迹规划问题求解全过程文档及程序
  • 柏拉图式爱情是同性之爱,绘画是理念世界的二次模仿
  • 【滴滴出行安全应急响应平台DSRC2倍积分卡】
  • HashMap 元素添加流程
  • 甲亢_甲状腺功能亢进_Methimazole甲巯基咪唑
  • 【Maven】VSCode Java+Maven 环境配置
  • 【目标检测】非极大值抑制NMS的原理与实现
  • 应用程序架构是如何演变的
  • 云原生Docker Cgroups资源控制操作
  • 【Java集合类面试二十五】、有哪些线程安全的List?