当前位置: 首页 > news >正文

Flink 对接 Hudi 查询数据,java代码编写

1.pom.xml文件需要引入下面包

    <properties><flink.version>1.15.4</flink.version><hudi.version>0.13.1</hudi.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- hudi --><dependency><groupId>org.apache.hudi</groupId><artifactId>hudi-flink1.15-bundle</artifactId><version>${hudi.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.hudi</groupId><artifactId>hudi-flink-client</artifactId><version>0.14.1</version></dependency></dependencies>

2.java代码如下

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.junit.Test;import java.util.List;public class HudiTest {@Testpublic void test01() throws Exception {StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment  tableEnv = StreamTableEnvironment.create(streamEnv);tableEnv.executeSql("CREATE TABLE IF NOT EXISTS table_name (\n" +"    resume_id bigint,\n" +"    update_by STRING,\n" +"    gmt_modified Timestamp ,\n" +"    del_flag int, \n" +"    invite_operation_date STRING,\n" +"    induct_date date ,\n" +"    leave_date date ,\n" +"    PRIMARY KEY (resume_id) NOT ENFORCED\n" +") with (\n" +" 'connector' = 'hudi',\n" +" 'path' = 'hdfs://177.17.17.200:8088/user/hudi/resume_demo/hr_resume',\n" +" 'table.type' = 'MERGE_ON_READ'\n" +")");Table table = tableEnv.sqlQuery("select * from table_name");// 启动 Flink 作业DataStream<Row> dataStream = tableEnv.toDataStream(table);streamEnv.execute();List<Row> rows = dataStream.executeAndCollect(100);//收集100条数据for (Row row : rows) {StringBuilder rowString = new StringBuilder();for (int i = 0; i < row.getArity(); i++) {rowString.append(row.getField(i)).append("|");}System.out.println(rowString.toString());}}
}

3.说明

经过测试,这里的sql中,支持下面的一些sql
简单where条件
limit 10 offset 0
不支持
order by
http://www.lryc.cn/news/351585.html

相关文章:

  • 计算机操作系统总结(1)
  • HTML5好看的通用网站模板源码
  • AWS安全性身份和合规性之Inspector
  • mybatis plus 配置多数据源(数据源进行切换)
  • Docker-Android安卓模拟器本地部署并实现远程开发测试
  • vue-封装上下(垂直方向)轮播
  • 海外私人IP和原生IP有什么区别,谁更有优势?
  • 主流容器工具对比以及重点推荐学习的企业级工具
  • 国产linux系统(银河麒麟,统信uos)使用 PageOffice 国产版在线编辑word文件,同时保存数据和文件
  • 个人感觉对Material设计有用的几个网址
  • ubuntu18 安装sudo
  • 【力扣一轮】202.快乐数 1.两数之和
  • Vue小程序项目知识积累(二)
  • RK3588 Android13 预安装自己的apk应用及把这个应用设置为默认桌面
  • NLP(16)--生成式任务
  • 直播回放| 机器人任务挑战赛线上培训资料合集
  • flask Web应用的接口调试
  • 简单易懂的 API 集成测试方法
  • leetcode 239. 滑动窗口最大值、347.前 K 个高频元素
  • npm常用指令
  • 数字孪生技术在管理中有哪些实际应用?
  • LeetCode/NowCoder-链表经典算法OJ练习3
  • 如何理解HTML语义化
  • Solved problem: The number of elements in the character array
  • Flume Channels简介及官方用例
  • 【AI】如何用非Docker方法安装类GPT WebUI
  • 2024年ai知识库:特点、应用与搭建
  • 查询一个字符串在另一个字符串中出现的次数(java)
  • Docker in Docker 原理与实战
  • Rust学习心得