当前位置: 首页 > news >正文

Flink自定义函数

一、UDF 核心原理

Flink 自定义函数(UDF)是扩展 Table API/SQL 能力的核心机制,允许将自定义逻辑嵌入查询。其设计遵循以下原则:

1. 函数类型体系

类型输入输出关系核心用途
标量函数(ScalarFunction)0~N 个标量 → 1 个标量字段转换、值计算
表值函数(TableFunction)0~N 个标量 → 多行多列数据拆分、关联外部数据
聚合函数(AggregateFunction)多行标量 → 1 个标量自定义聚合(如加权平均)
表值聚合函数(TableAggregateFunction)多行标量 → 多行多列分组TopN、分桶统计等
异步表值函数异步查询外部系统 → 多行多列高效关联外部数据库/API

2. 类型系统

  • 标量/表值函数使用新数据类型系统(基于DataTypes
  • 聚合函数仍使用旧类型系统(基于TypeInformation
  • 类型推导:默认通过反射获取,复杂场景可通过@DataTypeHint@FunctionHint注解显式指定

3. 执行逻辑

  • 核心是求值方法(如eval()accumulate()),定义数据处理逻辑
  • 生命周期:open()初始化 → 求值方法调用 → close()资源清理
  • 确定性:通过isDeterministic()声明是否返回确定结果(影响优化策略)

二、快速上手实战

1. 标量函数(ScalarFunction)

作用:对输入标量做转换计算(如字符串处理、格式转换)

实现步骤

  1. 继承ScalarFunction,实现eval()方法
    public class HashFunction extends ScalarFunction {// 输入任意类型,返回哈希值public int eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {return o.hashCode();}
    }
    
  2. 注册与调用
    // 注册
    tableEnv.createTemporarySystemFunction("HashFunc", HashFunction.class);
    // Table API 调用
    table.select(call("HashFunc", $("field")));
    // SQL 调用
    tableEnv.sqlQuery("SELECT HashFunc(field) FROM t");
    

2. 表值函数(TableFunction)

作用:将单行输入拆分为多行输出(如字符串按分隔符拆分)

实现步骤

  1. 继承TableFunction<T>,通过collect()输出结果
    @FunctionHint(output = @DataTypeHint("ROW<word STRING, len INT>"))
    public class SplitFunction extends TableFunction<Row> {public void eval(String str) {for (String s : str.split(" ")) {collect(Row.of(s, s.length())); // 输出每行数据}}
    }
    
  2. 注册与调用
    tableEnv.createTemporarySystemFunction("SplitFunc", SplitFunction.class);
    // 关联查询(LATERAL JOIN)
    tableEnv.sqlQuery("""SELECT t.id, s.word, s.len FROM t, LATERAL TABLE(SplitFunc(t.content)) AS s(word, len)
    """);
    

3. 聚合函数(AggregateFunction)

作用:多行数据聚合为单个值(如自定义平均值、求和逻辑)

实现步骤

  1. 定义累加器(存储中间结果)
    public class WeightedAvgAccum {public long sum = 0;   // 加权和public int count = 0; // 权重总和
    }
    
  2. 继承AggregateFunction,实现核心方法
    public class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccum> {@Overridepublic WeightedAvgAccum createAccumulator() { return new WeightedAvgAccum(); }// 累加逻辑public void accumulate(WeightedAvgAccum acc, long value, int weight) {acc.sum += value * weight;acc.count += weight;}// 最终结果计算@Overridepublic Long getValue(WeightedAvgAccum acc) {return acc.count == 0 ? null : acc.sum / acc.count;}
    }
    
  3. 注册与调用
    tableEnv.createTemporarySystemFunction("WeightedAvg", WeightedAvg.class);
    tableEnv.sqlQuery("""SELECT user, WeightedAvg(score, weight) FROM scores GROUP BY user
    """);
    

4. 表值聚合函数(TableAggregateFunction)

作用:多行数据聚合为多行结果(如分组取TopN)

实现步骤

  1. 定义累加器(存储中间状态)
    public class Top2Accum {public int first;  // 第一名public int second; // 第二名
    }
    
  2. 继承TableAggregateFunction,实现核心方法
    public class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, Top2Accum> {@Overridepublic Top2Accum createAccumulator() {Top2Accum acc = new Top2Accum();acc.first = Integer.MIN_VALUE;acc.second = Integer.MIN_VALUE;return acc;}// 累加逻辑public void accumulate(Top2Accum acc, int value) {if (value > acc.first) {acc.second = acc.first;acc.first = value;} else if (value > acc.second) {acc.second = value;}}// 输出结果public void emitValue(Top2Accum acc, Collector<Tuple2<Integer, Integer>> out) {out.collect(Tuple2.of(acc.first, 1));out.collect(Tuple2.of(acc.second, 2));}
    }
    
  3. 注册与调用
    tableEnv.createTemporarySystemFunction("Top2", Top2.class);
    // Table API 调用(SQL暂不支持)
    table.groupBy($("group")).flatAggregate(call(Top2.class, $("value")).as("val", "rank")).select($("group"), $("val"), $("rank"));
    

三、关键技巧

  1. 类型注解:复杂类型用@DataTypeHint指定,例如:

    @DataTypeHint("DECIMAL(12, 3)") // 声明 decimal 精度
    public BigDecimal eval(double a) { ... }
    
  2. 命名参数:通过@ArgumentHint指定参数名,支持 SQL 中按名传参:

    public String eval(@ArgumentHint(name = "content") String s,@ArgumentHint(name = "begin") int b
    ) { ... }
    // SQL 调用:SELECT func(content => 'abc', begin => 1)
    
  3. 确定性声明:非确定性函数(如随机数、当前时间)需重写:

    @Override
    public boolean isDeterministic() { return false; }
    

四、常见问题

  • 注册方式:临时注册(createTemporarySystemFunction)仅当前会话有效,永久注册需结合 Catalog
  • 权限控制:UDF 可访问外部资源(如数据库连接),需确保执行环境有对应权限
  • 性能优化:聚合函数尽量实现merge()方法,支持两阶段聚合优化

通过上述步骤,可快速实现各类自定义逻辑,扩展 Flink 处理能力。核心是理解不同函数的输入输出关系,以及累加器(聚合函数)的设计逻辑。

http://www.lryc.cn/news/584570.html

相关文章:

  • 一个编辑功能所引发的一场知识探索学习之旅(JavaScript、HTML)
  • Android 插件化实现原理详解
  • 虚拟储能与分布式光伏协同优化:新型电力系统的灵活性解决方案
  • Datawhale AI 夏令营:基于带货视频评论的用户洞察挑战赛 Notebook(下篇)
  • Chromium 引擎启用 Skia Graphite后性能飙升
  • 【TGRS 2025】新型:残差Haar离散小波变换下采样,即插即用!
  • 从零构建MVVM框架:深入解析前端数据绑定原理
  • 深入理解 Linux 中的 stat 函数与文件属性操作
  • NGINX系统基于PHP部署应用
  • 开发需要写单元测试吗?
  • Camera2API笔记
  • 记录一下openGauss自启动的设置
  • 《测试开发:从技术角度提升测试效率与质量》
  • io_helper说明
  • 使用Word/Excel管理需求的10个痛点及解决方案Perforce ALM
  • 二层环路避免-STP技术
  • LangChain框架 Prompts、Agents 应用
  • Selenium 4 教程:自动化 WebDriver 管理与 Cookie 提取 || 用于解决chromedriver版本不匹配问题
  • C++实习面试题
  • dexie 前端数据库封装
  • 【前端】jQuery数组合并去重方法总结
  • MinerU2将PDF转成md文件,并分拣图片
  • uniapp滚动组件, HuimayunScroll:高性能移动端滚动组件的设计与实现
  • 【Fargo】发送一个rtp包的过程1:怎么统一加twcc序号
  • 创始人IP如何进阶?三次关键突破实现高效转化
  • 使用SpringAOP自定义权限控制注解
  • 音频 SDP 文件格式
  • ElementUI:高效优雅的Vue.js组件库
  • Linux epoll简介与C++TCP服务器代码示例
  • Rust中Option和Result详解