当前位置: 首页 > news >正文

flink:通过table api把文件中读取的数据写入MySQL

当写入数据到外部数据库时,Flink 会使用 DDL 中定义的主键。如果定义了主键,则连接器将以 upsert 模式工作,否则连接器将以 append 模式工作

package cn.edu.tju.demo2;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.*;
import org.apache.flink.types.Row;public class Test41 {//demo 是MySQL中已经创建好的表//create table demo (userId varchar(50) not null,total bigint,avgVal double);private static String FILE_PATH = "info.txt";public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.connect(new FileSystem().path(FILE_PATH)).withFormat(new Csv()).withSchema(new Schema().field("userId", DataTypes.VARCHAR(50)).field("ts", DataTypes.INT()).field("val", DataTypes.DOUBLE())).createTemporaryTable("input");Table dataTable = tableEnv.from("input");Table aggregateTable = dataTable.groupBy("userId").select("userId, userId.count as total, val.avg as avgVal");String sql="create table jdbcOutputTable (" +" userId varchar(50) not null,total bigint,avgVal double " +") with (" +" 'connector.type' = 'jdbc', " +" 'connector.url' = 'jdbc:mysql://xx.xx.xx.xx:3306/test', " +" 'connector.table' = 'demo', " +" 'connector.driver' = 'com.mysql.jdbc.Driver', " +" 'connector.username' = 'root', " +" 'connector.password' = 123456' )";tableEnv.sqlUpdate(sql);aggregateTable.insertInto("jdbcOutputTable");tableEnv.execute("my job");}
}

文件info.txt

user1,1680000890,31.6
user2,1681111900,38.3
user1,1680000890,34.9
http://www.lryc.cn/news/318417.html

相关文章:

  • 【Java 多线程 哈希表】 HashTable, HashMap, ConcurrentHashMap 之间的区别
  • 有趣之matlab-烟花
  • C语言指针与数组(不适合初学者版):一篇文章带你深入了解指针与数组!
  • springboot Mongo大数据查询优化方案
  • Ollama管理本地开源大模型,用Open WebUI访问Ollama接口
  • Linux--基本知识入门
  • 基于springboot+vue实现的大学计算机课程管理平台的设计与实现(全套资料)
  • LeetCode2115. 从给定原材料中找到所有可以做出的菜
  • 项目性能优化—性能优化的指标、目标
  • 蓝桥杯刷题(三)
  • 20240312-算法复习打卡day21||● 530.二叉搜索树的最小绝对差 ● 501.二叉搜索树中的众数 ● 236. 二叉树的最近公共祖先
  • 今天我们来学习一下关于MySQL数据库
  • 长期护理保险可改善老年人心理健康 | CHARLS CLHLS CFPS 公共数据库周报(3.6)...
  • 49、C++/友元、常成员函数和常对象、运算符重载学习20240314
  • SQL Server错误:15404
  • Halcon文件操作
  • 【测试知识】业务面试问答突击版1
  • 使用el-row及el-col页面缩放时出现空行解决方案
  • java中几种对象存储(文件存储)中间件的介绍
  • 网络工程师——2024自学
  • SwiftUI的Picker
  • 物联网技术助力智慧城市转型升级:智能、高效、可持续
  • YOLOv7_pose-Openvino和ONNXRuntime推理【CPU】
  • 通过ACPI检测沙箱-反虚拟机
  • 计算点集的最小外接矩形——OpenCV的minAreaRect函数
  • Stripe Web 购买集成
  • 加密货币在网络违法犯罪活动中的利用情况调查
  • 【测试知识】业务面试问答突击版3---bug、测试用例设计
  • 使用大型语言模型进行实体提取
  • 基础:TCP是什么?