当前位置: 首页 > news >正文

Flink Table API 读写MySQL

Flink Table API 读写 MySQL

import org.apache.flink.connector.jdbc.table.JdbcConnectorOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;import static org.apache.flink.table.api.Expressions.$;public class TableApiMysql {public static void main(String[] args) {EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();TableEnvironment tableEnv = TableEnvironment.create(settings);Schema schema = Schema.newBuilder().column("user_id", DataTypes.BIGINT()).column("user_name", DataTypes.STRING()).build();TableDescriptor tableDescriptor = TableDescriptor.forConnector("jdbc").option(JdbcConnectorOptions.URL, "jdbc:mysql://localhost:3306/tmp").option(JdbcConnectorOptions.USERNAME, "root").option(JdbcConnectorOptions.PASSWORD, "123456").option(JdbcConnectorOptions.TABLE_NAME, "test").schema(schema).build();tableEnv.createTable("source", tableDescriptor);// 通过API执行selectSystem.out.println("select format 1: ");tableEnv.from("source").select($("user_id"), $("user_name")).execute().print();// 写入mysql user_id是自增主键tableEnv.executeSql("insert into source(user_name) select 'hello'");// 直接SQL执行select *System.out.println("select format 2: ");Table table = tableEnv.sqlQuery("select * from source");TableResult execute = table.execute();execute.print();}
}
http://www.lryc.cn/news/241940.html

相关文章:

  • Nginx 开源版安装
  • 『亚马逊云科技产品测评』活动征文|低成本搭建物联网服务器thingsboard
  • 【Pytorch】Visualization of Feature Maps(3)
  • 人工智能对我们的生活影响
  • Mysql存储引擎分类
  • 基于Python+TensorFlow+Django的交通标志识别系统
  • 【Java 进阶篇】Jedis:让Java与Redis轻松对话的利器
  • 【数据分享】我国12.5米分辨率的DEM地形数据(免费获取/地理坐标系)
  • C++设计模式之策略模式
  • spring-webflux的一些概念的理解
  • OpenCV快速入门:特征点检测与匹配
  • 旋转的数组
  • Hive VS Spark
  • SAST静态分析工具所支持的规则
  • torch 的数据加载 Datasets DataLoaders
  • 【Promise】某个异步方法执行结束后 在执行下面方法
  • 任意文件下载漏洞(CVE-2021-44983)
  • C++(20):通过source_location实现日志函数
  • 【数据结构】树与二叉树(廿二):树和森林的遍历——后根遍历(递归算法PostOrder、非递归算法NPO)
  • 精通Nginx(17)-安全管控之防暴露、限制访问、防DDos攻击、防爬虫、防非法引用
  • STM32 Flash
  • 文件批量重命名技巧:图片文件名太长怎么办?告别手动改名方法
  • 微信小程序手写滑动tab
  • 一文读懂如何安全地存储密码
  • 【运维面试100问】(六)buffer和cache的区别
  • 创建域名邮箱邮件地址的方法与步骤
  • Qt框架学习(1)
  • 3D电路板在线渲染案例
  • ResizeObserver loop limit exceeded报错解决方案
  • 【OpenCV实现图像:使用OpenCV进行图像处理之透视变换】