当前位置: 首页 > news >正文

Doris通过Flink CDC接入MySQL实战

1. 创建MySQL库表,写入demo数据

  1. 登录测试MySQL
 mysql -u root -pnew_password
  1. 创建MySQL库表,写入demo数据
CREATE DATABASE emp_1;USE emp_1;
CREATE TABLE employees_1 (emp_no      INT             NOT NULL,birth_date  DATE            NOT NULL,first_name  VARCHAR(14)     NOT NULL,last_name   VARCHAR(16)     NOT NULL,gender      ENUM ('M','F')  NOT NULL,    hire_date   DATE            NOT NULL,PRIMARY KEY (emp_no)
);INSERT INTO `employees_1` VALUES (10001,'1953-09-02','Georgi','Facello','M','1986-06-26'),
(10002,'1964-06-02','Bezalel','Simmel','F','1985-11-21'),
(10036,'1959-08-10','Adamantios','Portugali','M','1992-01-03');

注意:MySQL需要开通bin-log

  • log_bin=mysql_bin
  • binlog-format=Row
  • server-id=1

2. 创建Doris库表

  1. 创建Doris表
mysql -uroot -P9030 -h127.0.0.1
create database demo;
use demo;
CREATE TABLE all_employees_info (emp_no       int NOT NULL,birth_date   date,first_name   varchar(20),last_name    varchar(20),gender       char(2),hire_date    date
)
UNIQUE KEY(`emp_no`, `birth_date`)
DISTRIBUTED BY HASH(`birth_date`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);

3. 启动Flink

  1. 启动flink
cd /mnt/apps/flink-1.15.3/ 
#启动flink,这里服务已经启动
bin/start-cluster.sh 
#进入SQL控制台
bin/sql-client.sh embedded
  1. 创建Flink 任务:
SET 'execution.checkpointing.interval' = '10s';CREATE TABLE employees_source (database_name STRING METADATA VIRTUAL,table_name STRING METADATA VIRTUAL,emp_no int NOT NULL,birth_date date,first_name STRING,last_name STRING,gender STRING,hire_date date,PRIMARY KEY (`emp_no`) NOT ENFORCED) WITH ('connector' = 'mysql-cdc','hostname' = 'localhost','port' = '3306','username' = 'root','password' = 'new_password','database-name' = 'emp_1','table-name' = 'employees_1');CREATE TABLE cdc_doris_sink (emp_no       int ,birth_date   STRING,first_name   STRING,last_name    STRING,gender       STRING,hire_date    STRING
) 
WITH ('connector' = 'doris','fenodes' = '172.16.64.9:8030','table.identifier' = 'demo.all_employees_info','username' = 'root','password' = '','sink.properties.two_phase_commit'='true','sink.label-prefix'='doris_demo_emp_002'
);insert into cdc_doris_sink (emp_no,birth_date,first_name,last_name,gender,hire_date) 
select emp_no,cast(birth_date as string) as birth_date ,first_name,last_name,gender,cast(hire_date as string) as hire_date  from employees_source;
  1. 输入如下地址,查看flink任务
    http://localhost:8081/#/job/running

  2. 数据验证:启动后可以看到有数据实时进入Doris了

mysql -uroot -P9030 -h127.0.0.1
mysql> select * from all_employees_info;
+--------+------------+------------+-----------+--------+------------+
| emp_no | birth_date | first_name | last_name | gender | hire_date  |
+--------+------------+------------+-----------+--------+------------+
|  10001 | 1953-09-02 | Georgi     | Facello   | M      | 1986-06-26 |
|  10002 | 1964-06-02 | Bezalel    | Simmel    | F      | 1985-11-21 |
|  10036 | 1959-08-10 | Adamantios | Portugali | M      | 1992-01-03 |
|  20001 | 1953-09-02 | Georgi     | Facello   | M      | 1986-06-26 |
+--------+------------+------------+-----------+--------+------------+
4 rows in set (0.02 sec)

Link

  • https://zhuanlan.zhihu.com/p/532913664
  • https://www.runoob.com/mysql/mysql-install.html
  • https://repo.maven.apache.org/maven2/org/apache/doris/flink-doris-connector-1.15/1.2.1/
  • https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.1/

Jar包地址:

flink 环境:1.15.3

  • https://dlcdn.apache.org/flink/flink-1.15.3/flink-1.15.3-bin-scala_2.12.tgz
    解压并将jar包防止在Flink 的lib下
    flink-doris-connector:1.15
  • https://repo.maven.apache.org/maven2/org/apache/doris/flink-doris-connector-1.15/1.2.1/flink-doris-connector-1.15-1.2.1.jar
    cdc mysql:flink-sql-connector-mysql-cdc-2.2.1.jar
  • https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.1/flink-sql-connector-mysql-cdc-2.2.1.jar
http://www.lryc.cn/news/8286.html

相关文章:

  • 搭建zookeeper高可用集群详细步骤
  • Scala 变量和数据类型(第二章)
  • 【JVM基础内容速查表】JVM基础知识 默认参数 GC命令 工具使用 JVM参数设置、说明、使用方法、注意事项等(持续更新)
  • C语言经典编程题100例(61~80)
  • toxssin:一款功能强大的XSS漏洞扫描利用和Payload生成工具
  • Keepalived与HaProxy的协调合作原理分析
  • 抖音如何找到博主视频推广?筛选博主要看那些数据
  • Win11的两个实用技巧系列之如何关闭登录密码?
  • 润普挂卷失败之老卷宗对接NP无法获取案件信息问题排查
  • 产品经理面试题思考及回答思路(一)
  • Routability-Driven Macro Placement with Embedded CNN-Based Prediction Model
  • 论一个上班族如何一次性通过PMP考试
  • Web前端:使用Angular CLI时的最佳实践和专业技巧
  • 从0到1一步一步玩转openEuler--15 openEuler使用DNF管理软件包
  • 【java】Spring Boot --spring boot项目整合xxl-job
  • 视图、索引、存储过程、触发器
  • ImportError: cannot import name ‘FlattenObservation‘ from ‘gym.wrappers‘ 解决方案
  • 大件传输的9种方法
  • 将vue2的项目《后台管理模式》转变为vue3版本 (一)
  • 苹果手机怎么下载手机铃声?图文教程,快速学会
  • AJAX笔记(二)Fetch和axios
  • TOTOLINK NR1800X 系列 CVE 分析
  • IDEA如何将代码进行上下左右移动,改变位置
  • 【Java 面试合集】HashMap中为什么引入红黑树,而不是AVL树呢
  • 深度学习Week15-common.py文件解读(YOLOv5)
  • qemu的snapshot快照功能的详细使用介绍
  • 谷歌关键词优化多少钱【2023年调研】
  • 凸包及其算法
  • 计算机网络学习笔记(二)物理层
  • 为什么职称要提前准备?