当前位置: 首页 > news >正文

Flink SQL自定义标量函数(Scalar Function)

使用场景: 标量函数即 UDF,⽤于进⼀条数据出⼀条数据的场景。

开发流程:

  • 实现 org.apache.flink.table.functions.ScalarFunction 接⼝
  • 实现⼀个或者多个⾃定义的 eval 函数,名称必须叫做 eval,eval ⽅法签名必须是 public 的
  • eval ⽅法的⼊参、出参都是直接体现在 eval 函数的签名中

开发案例:

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.InputGroup;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import static org.apache.flink.table.api.Expressions.*;/*** 输入数据: * nc -lk 88888* a,1** 输出结果:* res1=>:3> +I[97]* res2=>:3> +I[97]* res3=>:3> +I[97]*/
public class ScalarFunctionTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);DataStreamSource<String> source = env.socketTextStream("localhost", 8888);SingleOutputStreamOperator<Tuple2<String, String>> tpStream = source.map(new MapFunction<String, Tuple2<String, String>>() {@Overridepublic Tuple2<String, String> map(String input) throws Exception {return new Tuple2<>(input.split(",")[0], input.split(",")[1]);}});Table table = tEnv.fromDataStream(tpStream, "id,name");tEnv.createTemporaryView("SourceTable",table);// 在 Table API ⾥不经注册直接调⽤函数Table res1 = tEnv.from("SourceTable").select(call(HashFunction.class, $("id")));// 注册函数tEnv.createTemporarySystemFunction("HashFunction", HashFunction.class);// 在 Table API ⾥调⽤注册好的函数Table res2 = tEnv.from("SourceTable").select(call("HashFunction", $("id")));// 在 SQL ⾥调⽤注册好的函数Table res3 = tEnv.sqlQuery("SELECT HashFunction(id) FROM SourceTable");tEnv.toDataStream(res1).print("res1=>");tEnv.toDataStream(res2).print("res2=>");tEnv.toDataStream(res3).print("res3=>");env.execute();}public static class HashFunction extends ScalarFunction {// 接受任意类型输⼊,返回 INT 型输出public int eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {return o.hashCode();}}
}

测试结果:

在这里插入图片描述

http://www.lryc.cn/news/229287.html

相关文章:

  • 【第2章 Node.js基础】2.4 Node.js 全局对象(二) process 对象
  • 觉非科技发布【轻地图高速NOA智驾方案】|地平线,觉非科技,MobileDrive超捷生态协作实现技术落地
  • 竞赛 车道线检测(自动驾驶 机器视觉)
  • 128. 最长连续序列
  • 设计模式-设计原则
  • MongoDB基础运维
  • 侧击雷如何检测预防
  • 检索搜索信息能力
  • 设计大咖亲授:Figma中文环境设置全攻略!
  • 华为Hcia-数通学习(更改策略)
  • 数据校验:Spring Validation
  • CSS怎么选择除了第一个子元素外的其余同级子元素
  • Mac下eclipse配置JDK
  • 基于springboot实现体育场馆运营平台项目【项目源码】
  • 优雅的Java编程:将接口对象作为方法参数
  • 一文简单聊聊protobuf
  • Unity Meta Quest 一体机开发(五):手势抓取概述
  • 传输层中的TCP和UPD协议
  • 插入排序算法(C++版)
  • Tracking vs. No-Tracking Queries
  • Centos7安装frps实现内网穿透
  • cryptopp Base64Encoder \n问题
  • 一种艺术风格的神经算法:总结与实现
  • 【Mysql系列】Mysql基础篇
  • C++面试题之C++中的指针参数传递和引用参数传递
  • [Android]Unresolved reference: appcompat
  • 网络运维Day14
  • Mac常用软件安装
  • node 文件上传操作(前端 form表单上传 formData上传 后端 node 使用express+multer)
  • 容器数据卷+MYSQL实战