Flink流水线+Gravitino+Paimon集成
1.数据源管理
1.1 添加Gravitino数据源
添加成功之后,会在Gravitino中创建一个名为配置的中的meatalake
1.2. 添加Paimon数据源
属性gravitinoId
可以关联前面创建的Gravitino数据源,关联后,会在gravitino下创建一个该数据源的catalog
。
2. 集成演示
2.1 创建任务
-
入口:通过顶部菜单栏选择 任务开发,或通过快捷入口 快速创建任务。
-
任务类型:选择
FlinkPipeline
。
2.2 配置任务
点击任务名称,进入任务详情页。任务节点如下
功能说明:
使用DataGen
节点生成100条测试数据,使用JDBCWrite
将数据写入mysql的user表,再通过JDBCRead
读取mysql的user表,使用SQLExecute
写入到Paimon。通过SQLQuery
读取Paimon,最后通过ShowData
节点输入读取的数据。
{"flow": {"engineType": "flink","name": "flink_gravition_paimon","paths": [{"from": "Gravitino_tuky5","inport": "","outport": "","to": "DataGen_r6QRQ"},{"from": "DataGen_r6QRQ","inport": "","outport": "","to": "JDBCWrite__yYGc"},{"from": "JDBCWrite__yYGc","inport": "","outport": "","to": "JDBCRead_WREvh"},{"from": "JDBCRead_WREvh","inport": "","outport": "","to": "SQLExecute_bMeVm"},{"from": "SQLExecute_bMeVm","inport": "","outport": "","to": "SQLQuery_8jwT7"},{"from": "SQLQuery_8jwT7","inport": "","outport": "","to": "ShowData_aMIL3"}],"runMode": "DEBUG","stops": [{"bundle": "cn.piflow.bundle.flink.catalog.Gravitino","customizedProperties": {},"name": "Gravitino_tuky5","properties": {"gravitinoUri": "http://127.0.0.1:8092/","metalake": "metalake"},"uuid": "Gravitino_tuky5"},{"bundle": "cn.piflow.bundle.flink.common.DataGen","customizedProperties": {},"name": "DataGen_r6QRQ","properties": {"count": "100","ratio": "10","registerTableName": "datagen_source","schema": "[{\"id\":\"323868\",\"filedName\":\"id\",\"filedType\":\"INT\",\"kind\":\"sequence\",\"start\":\"1\",\"end\":\"1000\"},{\"id\":\"479324\",\"filedName\":\"name\",\"filedType\":\"STRING\",\"kind\":\"random\",\"length\":\"5\",\"index\":1}]"},"uuid": "DataGen_r6QRQ"},{"bundle": "cn.piflow.bundle.flink.jdbc.JDBCWrite","customizedProperties": {},"name": "JDBCWrite__yYGc","properties": {"driver": "com.mysql.jdbc.Driver","password": "123456","properties": "{}","tableDefinition": "{\"tableBaseInfo\":{},\"physicalColumnDefinition\":[{\"columnName\":\"id\",\"columnType\":\"INT\",\"length\":null,\"precision\":null,\"scale\":null,\"nullable\":false,\"primaryKey\":false,\"partitionKey\":false,\"comment\":\"\"},{\"columnName\":\"name\",\"columnType\":\"STRING\",\"length\":null,\"precision\":null,\"scale\":null,\"nullable\":false,\"primaryKey\":false,\"partitionKey\":false,\"comment\":\"\"}],\"metadataColumnDefinition\":[],\"computedColumnDefinition\":[],\"watermarkDefinition\":{}}","tableName": "user","url": "jdbc:mysql://127.0.0.1:3306/test2?characterEncoding=utf8&autoReconnect=true&tinyInt1isBit=false&serverTimezone=Asia/Shanghai","username": "root"},"uuid": "JDBCWrite__yYGc"},{"bundle": "cn.piflow.bundle.flink.jdbc.JDBCRead","customizedProperties": {},"name": "JDBCRead_WREvh","properties": {"driver": "com.mysql.jdbc.Driver","fetchSize": "10","password": "123456","properties": "{}","tableDefinition": "{\"tableBaseInfo\":{\"catalogName\":\"\",\"dbname\":\"\",\"schema\":\"\",\"registerTableName\":\"t_user_source\",\"registerTableComment\":\"\",\"ifNotExists\":true,\"selectStatement\":\"\",\"likeStatement\":\"\"},\"physicalColumnDefinition\":[{\"columnName\":\"id\",\"columnType\":\"INT\",\"length\":null,\"precision\":null,\"scale\":null,\"nullable\":false,\"primaryKey\":false,\"partitionKey\":false,\"comment\":\"\"},{\"columnName\":\"name\",\"columnType\":\"STRING\",\"length\":null,\"precision\":null,\"scale\":null,\"nullable\":false,\"primaryKey\":false,\"partitionKey\":false,\"comment\":\"\"}],\"metadataColumnDefinition\":[],\"computedColumnDefinition\":[],\"watermarkDefinition\":{}}","tableName": "user","url": "jdbc:mysql://127.0.0.1:3306/test2?characterEncoding=utf8&autoReconnect=true&tinyInt1isBit=false&serverTimezone=Asia/Shanghai","username": "root","useTableEnv": "true"},"uuid": "JDBCRead_WREvh"},{"bundle": "cn.piflow.bundle.flink.common.SQLQuery","customizedProperties": {},"name": "SQLQuery_8jwT7","properties": {"registerResultViewName": "","registerSourceViewName": "","sql": "select id, name from paimon.test.t_user","useTableEnv": "true"},"uuid": "SQLQuery_8jwT7"},{"bundle": "cn.piflow.bundle.flink.common.SQLExecute","customizedProperties": {},"name": "SQLExecute_bMeVm","properties": {"sql": "create table if not exists paimon.test.t_user (\r\n id int,\r\n name string\r\n);\r\n\r\ninsert into paimon.test.t_user select * from t_user_source;","useTableEnv": "true"},"uuid": "SQLExecute_bMeVm"},{"bundle": "cn.piflow.bundle.flink.common.ShowData","customizedProperties": {},"name": "ShowData_aMIL3","properties": {"changeLog": "false","showNumber": "100"},"uuid": "ShowData_aMIL3"}],"uuid": "111"}
}
2.3 运行任务
- 点击 运行 按钮启动任务。
🔗 平台体验地址:DataStudio (http://1.94.182.15:8090)