当前位置: 首页 > news >正文

flink-cdc同步数据到doris中

1 创建数据库和表

1.1 数据库脚本

这样直接创建数据库是有问题,因为后面发现superset连接使用doris://root:123456@10.101.12.82:9030/internal.eayc?charset=utf8mb4

-- 创建数据库eayc
create database if not exists ods_eayc;
-- 创建数据表

1

2 数据同步

2.1 flnk-cdc

参考Flink CDC实时同步MySQL到Doris
Flink CDC 概述

2.1.1 最简单的单表同步

从下面的yml脚本可以看到,并没有doris中创建eayc_user表,应该是flink-cdc自动创建的。

#Mysql的参数配置
source:type: mysqlhostname: 10.101.10.11port: 3306username: flinkpassword: 123456tables: eayc.eayc_userserver-id: 5400# server-time-zone: UTC
#Doris的参数配置
sink:type: dorisfenodes: 10.101.11.2:8030,10.101.11.2:8030,10.101.11.3:8030username: rootpassword: 123456table.create.properties.light_schema_change: truetable.create.properties.replication_num: 1route:- source-table: eayc.eayc_usersink-table: ods_eayc.eayc_user
pipeline:name: eayc to dorisparallelism: 1

注意连接mysql的server-id的要唯一,否则提示下面的错误

A slave with the same server_uuid/server_id as this slave has connected to the master...
The 'server-id' in the mysql cdc connector should be globally unique, but conflicts happen now.

进入到flink的界面查看到错误日志,任务执行失败。下面报的错是mysql时区与flink配置不匹配。现在改生产库影响未知,不敢动,于是去掉server-time-zone: UTC设置。重新执行任务。
1

1
此时任务可以正常执行了,数据也可以正常过来了。因为flink-cdc是根据binlog,因此mysql变更,doris中的数据也实时更新过来。
1

2.1.2 多表同步

如下配置

source:tables: eayc.eayc_user,eayc.eayc_company,eayc.eayc_company_user
route:- source-table: eayc.eayc_usersink-table: ods_eayc.eayc_user- source-table: eayc.eayc_companysink-table: ods_eayc.eayc_company- source-table: eayc.eayc_company_usersink-table: ods_eayc.eayc_company_user

下面这种方式不支持,会报下面的错误:

Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize value of type `java.lang.String` from Array value (token `JsonToken.START_ARRAY`)at [Source: UNKNOWN; byte offset: #UNKNOWN] (through reference chain: java.util.LinkedHashMap["tables"])

1

2.1.3 分表导入

taskmanager.numberOfTaskSlots默认为1,slot不够,就报下面的错误,因为是16C32G,于是我改成了8,parallelism.default默认也是1,我也改成了8,启动之后,没有报下面的错误,但是之前执行的任务没有了。

2025-02-19 15:05:07
java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.at 

如果mysql的表没有主键,则报下面的错误,这个时候就需要修正原mysql表数据。

Caused by: org.apache.flink.table.api.ValidationException: 'scan.incremental.snapshot.chunk.key-column' must be set when the table doesn't have primary keys.

doris权限问题,这个是FE集群有问题,更改过来就好了。

reason: SchemaChange request error with Failed to schemaChange, response: {"msg":"Unauthorized","code":401,"data":"Access denied for user 'root@10.101.12.90' (using password: YES)","count":0}

可以看到下面,要获取acc的全部表,但是有一些是做了分表,需合并到其中doris的一张表里面,这个规则是有效的,开始parallelism: 1,我以为有一异常,只同步了一张表,过了几分钟才发现其他表也陆续进来。

source:tables: acc.\.*
route:- source-table: acc.acc_account_balance_\.*sink-table: acc.acc_account_balance- source-table: acc.acc_account_subject_\.*sink-table: acc.acc_account_subject- source-table: acc.acc_initial_balance_\.*sink-table: acc.acc_initial_balance- source-table: acc.acc_voucher_\.*sink-table: acc.acc_voucher- source-table: acc.acc_voucher_entry_\.*sink-table: acc.acc_voucher_entry    

于是将parallelism: 4,很快后台又抛异常。

java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.

于是调整

taskmanager.memory.process.size: 8192m  # 增加 TaskManager 的内存

Flink CDC并行执行,会出现数据越界的问题。
Flink CDC报错ArrayIndexOutOfBoundsException解决思路

2.2 flink安装

2.2.1 单节点
tar -zxvf flink-1.18.0-bin-scala_2.12.tgz
# 配置环境变量
vi /etc/profile
export JAVA_HOME=/appdata/jdk1.8.0_181
export CLASSPATH=$JAVA_HOME/lib
export FLINK_HOME=/appdata/flink/flink-1.18.0
export PATH=$JAVA_HOME/bin:$FLINK_HOME/bin:$PATH
# 生效
source /etc/profile
# flink配置
vim conf/flink-conf.yaml
execution.checkpointing.interval: 3000
rest.bind-address: 0.0.0.0
cd bin
./start-cluster.sh
#
tar -zxvf flink-cdc-3.0.0-bin.tar.gz
# 执行任务
cd /appdata/flink/flink-cdc-3.0.0
bash bin/flink-cdc.sh /appdata/flink/job/eayc_to_doris.yml

flink-1.18.0
flink-cdc-3.0.0
mysql pipeline connector 3.0.0
doris pipeline connector 3.0.0
将上面两个connector放到cdc的lib目录
1

2.2.2 监控

1

1

http://www.lryc.cn/news/539714.html

相关文章:

  • Kubernetes:EKS 中 Istio Ingress Gateway 负载均衡器配置及常见问题解析
  • Golang教程
  • AI 百炼成神:线性回归,预测房价
  • 企业软件合规性管理:构建高效、安全的软件资产生态
  • 每日一题——编辑距离
  • TensorFlow项目GPU运行 安装步骤
  • c++进阶———继承
  • FreeSwitch的mod_translate模块详细,附带场景案例及代码示例
  • 前端504错误分析
  • 在 .NET 8/9 中使用 AppUser 进行 JWT 令牌身份验证
  • 基于python实现机器学习的心脏病预测系统
  • 使用 NVM 随意切换 Node.js 版本
  • 【Prometheus】prometheus结合pushgateway实现脚本运行状态监控
  • SpringBoot 项目配置动态数据源
  • CSS基本选择器
  • idea-代码补全快捷键
  • 基于SpringBoot+vue粮油商城小程序系统
  • 挪车小程序挪车二维码php+uniapp
  • 企业内部知识库:安全协作打造企业智慧运营基石
  • 网络安全推荐的视频教程 网络安全系列
  • 麒麟管家全新升级,运维问题“一键修复”
  • MVCC(多版本并发控制)机制讲解
  • React 与 Vue 对比指南 - 上
  • 开源协议深度解析:理解MIT、GPL、Apache等常见许可证
  • 通用评估系统(五)- 前端部分总体说明
  • STM32GPIO
  • MyBatis拦截器终极指南:从原理到企业级实战
  • SpringBoot启动失败之application.yml缩进没写好
  • 【JavaScript】《JavaScript高级程序设计 (第4版) 》笔记-Chapter17-事件
  • 鸿蒙开发:V2版本装饰器之@Monitor装饰器