当前位置: 首页 > news >正文

Flink读取mysql数据库(java)

代码如下:

package com.weilanaoli.ruge.vlink.flink;import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import java.util.logging.Level;
import java.util.logging.Logger;class MysqlExample {public static void main(String[] args) throws Exception {MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("xx.xx.xx.xx")    //输入地址.port(3306)                 //输入端口.databaseList("xx")         //输入库名.tableList("xx.test")       //输入表名.username("xx")             //输入用户名.password("xxxx")           //输入密码.startupOptions(StartupOptions.initial())  //读取binlog策略,这个启动选项有五种.deserializer(new JsonDebeziumDeserializationSchema()) //配置不要锁表,但是数据一致性不是精准一次,会变成最少一次.build();//配置执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// enable checkpointenv.enableCheckpointing(3000);DataStreamSource<String> dataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");SingleOutputStreamOperator<Object> singleOutputStreamOperator = dataStreamSource.process(new ProcessFunction<String, Object>() {@Overridepublic void processElement(String value, ProcessFunction<String, Object>.Context ctx, Collector<Object> out) {try {System.out.println("processElement=====" + value);}catch (Exception e) {e.printStackTrace();}}});dataStreamSource.print("原始数据=====");env.execute("Print MySQL Snapshot + Binlog");}
}

运行结果如下:

http://www.lryc.cn/news/111525.html

相关文章:

  • 小程序学习(五):WXSS模板语法
  • 注解 @JsonFormat 与 @DateTimeFormat 的使用
  • Python实现决策树算法:完整源码逐行解析
  • Linux文本三剑客---grep、sed、awk
  • 局域网VoIP网络电话测试
  • el-table 去掉边框(修改颜色)
  • redis与MongoDB的区别
  • CSS设置高度
  • 开源免费用|Apache Doris 2.0 推出跨集群数据复制功能
  • 【docker】docker-compose服务编排
  • EdgeBox_tx1_A200 PyTorch v1.9.0 环境部署
  • 【雕爷学编程】MicroPython动手做(33)——物联网之天气预报
  • 分库分表之基于Shardingjdbc+docker+mysql主从架构实现读写分离 (三)
  • 探秘企业DevOps一体化平台建设终极形态丨IDCF
  • 百度智能创做AI平台
  • Python 开发工具 Pycharm —— 使用技巧Lv.1
  • zookeeper --- 高级篇
  • TypeScript【enum 枚举】
  • SpringBoot项目增加logback日志文件
  • 复习之selinux的管理
  • 无涯教程-Lua - 文件I/O
  • java+ssm民宿酒店客房推荐预订系统_2k78b--论文
  • Docker实战-关于Docker镜像的相关操作(一)
  • Jenkins Gerrit Trigger实践
  • Xcode protobuf2.5添加arm64编译器补丁生成静态库
  • 计算机毕设 深度学习疫情社交安全距离检测算法 - python opencv cnn
  • 四数之和——力扣18
  • Serializable 和 Externalizable区别?
  • 2023 电赛 E 题 K210 方案--K210实现矩形识别
  • 【雕爷学编程】MicroPython动手做(29)——物联网之SIoT 2