当前位置: 首页 > news >正文

Flink-DataWorks第四部分:数据同步(第60天)

系列文章目录

2.4.2 DataStudio侧实时同步
2.4.3 数据集成侧同步任务

文章目录

  • 系列文章目录
  • 前言
      • 2.4.2 DataStudio侧实时同步
      • 2.4.3 数据集成侧同步任务


前言

本文主要详解了DataWorks的数据同步,为第四部分:
由于篇幅过长,分章节进行发布。
后续:
 数据开发

2.4.2 DataStudio侧实时同步

DataWorks为用户提供的实时数据同步功能,方便用户使用单表或整库同步方式,将源端数据库中部分或全部表的数据变化实时同步至目标数据库中,实现目标库实时保持和源库的数据对应。
使用限制:
 实时同步不支持在数据开发界面运行任务,用户需要保存、提交实时同步节点后,在生产环境运维中心运行该节点。
 实时同步仅支持运行在独享数据集成资源组上。
 实时同步任务不支持同步视图。
 目前支持的同步方式有:
在这里插入图片描述

步骤一:创建实时同步节点
(1)创建实时同步节点
可以通过以下两种方式创建实时同步节点。
 方式一:展开业务流程,右键单击数据集成 > 新建节点 > 实时同步。
 方式二:双击业务流程名称,将数据集成目录下的实时同步节点直接拖拽至右侧业务流程编辑面板。
(2)在新建节点对话框中,配置各项参数
同步方式设置为数据库变更数据同步到MaxCompute
路径设置为业务流程/test/数据集成
名称设置为stream_data_integration
步骤二:配置实时同步任务
(1)设置同步来源和规则
设置类型为MySQL,数据源设置为rdsmysql
选择同步的源表,这里勾选student_list表,然后添加到右侧
在这里插入图片描述

设置表(库名映射规则):设置表(表(库)名映射规则支持正则表达式转换,比如需要将名称为 “table_01”,“table_02”,“table_03” 同步到一张叫 “my_table” 的表,可以配置正则表名转换规则:源:table.* > 目标 my_table)名映射规则
因为这里是单表采集,所以无需设置
点击下一步
(2)设置目标表
目标MaxCompute数据源设置为odps_first
写入模式为实时直接写增量表
时间自动分区设置:分区表
在这里插入图片描述

点击编辑,修改分区间隔为天
刷新源表和MaxCompute表映射
然后点击下一步
(3)因为是新表,所以需要自动建表,点击开始建表
在这里插入图片描述

(4)设置表粒度同步规则
点击配置DML,可以设置插入、更新、删除的策略,是选择还是正常处理
在这里插入图片描述

一般选择正常处理即可,点击下一步
(5)DDL消息处理规则
对于关系型数据的实时同步,其原始实时信息会包含DDL操作,此处可以设置针对于这些DDL消息同步到目标表时的操作。
【注意】此处策略仅仅是本任务初次启动时的规则,此后用户可以在对应的实时任务运维页面中,通过停止任务,修改DDL规则,再启动的方式,应用新的DDL策略。
在这里插入图片描述

各个处理策略的含义:
“正常处理”,此DDL消息将会继续下发给目标数据源,由目标数据源来处理,不同目标数据源处理策略可能会不同。
“忽略”:丢弃掉此DDL消息,不再向目标数据源发送此消息。
“出错”:直接让实时同步任务以出错状态终止运行。
保持默认即可。
(6)运行资源设置
资源组选择hmcx
Tunnel资源组选择公共传输资源
点击完成配置。
步骤三:提交并发布实时同步任务
(1)单击工具栏中的保存图标,保存节点。
(2)单击工具栏中的提交图标,提交节点任务。
在提交新版本对话框中,输入变更描述。
单击确定。
(3)任务提交成功后,需要将任务发布至生产环境进行发布。单击顶部菜单栏右侧的任务发布。
在这里插入图片描述

选择任务节点,进行发布。
(4)点击运维中心,在实时任务运维的实时同步任务中可以看到提交上来的任务
点击启动,可以启动同步任务
在这里插入图片描述

(5)启动成功后,稍等片刻
在MySQL中输入
insert into test.student_list values (‘09’, ‘李博’, ‘1995-05-24’, ‘男’);
然后在dataworks
set odps.sql.allow.fullscan=true;

select * from dwhmcx.rdsmysql_test_student_list;
可以看到同步的消息。
在这里插入图片描述

2.4.3 数据集成侧同步任务

数据集成基于源端数据库与目标端数据库类型为用户提供丰富的数据同步任务,同步类型包括:整库离线同步(一次性全量同步、周期性全量同步、离线全增量同步、一次性增量同步、周期性增量同步)、一键实时同步(一次性全量同步,实时增量同步)。不同源端与目标端数据库支持的同步方案不同,具体支持的方案详情请参考产品界面。

步骤一:创建同步任务
打开数据集成,然后在同步任务中,创建同步任务,填写来源和去向。
这里来源选择MySQL,去向选择MaxCompute,然后点击开始创建。
在这里插入图片描述

步骤二:选择同步方案
填写新任务名称,full_increment_integration
选择同步类型,目前支持的类型有以下几种。这里选择整库全增量(准实时)
在这里插入图片描述

责任人选择xxx111
步骤三:网络与资源配置
按照下图进行配置,测试连通性没有问题后点击下一步
在这里插入图片描述

步骤四:设置同步来源和规则
基本配置和数据来源不用修改
选择同步的源表中,选择test库,然后添加到右侧
在这里插入图片描述

设置表(库)名映射规则
点击添加目标表名规则,设置目标表名规则为:full_increment_${db_table_name_src_transed}
在这里插入图片描述

可以使用的内置变量有:
${db_table_name_src_transed}:“源表名和目标表名转换规则”中的转换完成之后的表名。
${db_name_src_transed}:“和转换规则”中的转换完成之后的。
${ds_name_src}:源数据源名。
步骤五:设置目标表
设置“时间自动分区设置”为非分区表

在这里插入图片描述

然后刷新源表和MaxCompute表映射
然后点击下一步
步骤六:设置表粒度同步规则
Base表Merge设置:
每张表的实时同步数据会在其全量离线任务完成后,实时的写入MaxCompute(Log表),然后会经过拆分(Split)变为Delta表,最后再与Base表进行合并(Merge),最终结果会写入Base表中。默认情况下会根据用户在上一步中指定的MaxCompute(ODPS)时间分区来做Merge,例如用户指定的分区是“天”,则会每天进行一次merge,也就意味着Base表的数据更新需要等到第二天。但是由于每个表的大小不一样,所以全量离线同步任务完成的时间有先后,如果用户想指定某些表在完成全量离线同步之后提早进行merge动作,以便及时获得最新的Base表数据,那么用户可以在本页中进行设置。

全量任务离线任务已经完成后,将最新实时同步的结果Merge进入Base表中
按小时独立:每一小时进行一次merge动作。
按天独立:每一天进行一次merge动作。
按分区周期:按照maxcompute表的分区间隔进行merge动作,前提是上一个周期的merge已经完成。

这里修改为按天独立,点击下一步
在这里插入图片描述

步骤七:设置表粒度同步规则
对于关系型数据的实时同步,其原始实时信息会包含DDL操作,此处可以设置针对于这些DDL消息同步到目标表时的操作。
【注意】此处策略仅仅是本任务初次启动时的规则,此后用户可以在对应的实时任务运维页面中,通过停止任务,修改DDL规则,再启动的方式,应用新的DDL策略。
保持默认即可,点击下一步
在这里插入图片描述

步骤八:运行资源设置
这里可以修改资源组
连通性测试没有问题后,勾选立即执行,完成配置

此时会生成两个任务,一个离线,一个实时。离线用来同步全量任务,而实时用来同步增量任务。

任务列表中也可以查到刚刚创建的任务

在ODPS中即可查看结果:
select * from dwhmcx.full_increment_student_list;

如果任务需要停止,则需要到运维中心的实时同步任务中进行停止。
在这里插入图片描述

http://www.lryc.cn/news/418702.html

相关文章:

  • go post请求,参数是raw json格式,response是固定结构。
  • 国产开源大模型都有哪些?
  • 基于Hadoop的超市进货推荐系统设计与实现【springboot案例项目】
  • ChatGPT能从这几个方面提升学术论文质量
  • Python3的安装及基础指令
  • 使用Spring与JDK动态代理实现事务管理
  • 服务器硬件及RAID配置
  • 【经验总结】ShardingSphere5.2.1 + Springboot 快速开始
  • 基于Golang实现Kubernetes边车模式
  • TCP 通信全流程分析:从连接建立到数据传输的深度探索
  • 4、提取H264码流中nalu
  • 哈佛大学单细胞课程|笔记汇总 (二)
  • java中抽象类和接口的区别
  • Spring Boot - 在Spring Boot中实现灵活的API版本控制(下)_ 封装场景启动器Starter
  • EasyCVR视频转码:T3视频平台不支持GB28181协议,应该如何实现与视频联网平台的对接与视频共享呢?
  • Spring统一处理请求响应与异常
  • SqlServer公用表表达式 (CTE) WITH common_table_expression
  • 常见中间件漏洞
  • elasticsearch的学习(二):Java api操作elasticsearch
  • docker 部署 ElasticSearch;Kibana
  • k8s使用kustomize来部署应用
  • 基于开源FFmpeg和SDL2.0的音视频解码播放和存储系统的实现
  • 保姆级教程,一文了解LVS
  • 【STM32】DMA数据转运(存储器到存储器)
  • 【Android】通过代码打开输入法
  • 爬虫集群部署:Scrapyd 框架深度解析
  • pytorch GPU操作事例
  • linux常见性能监控工具
  • C++ | Leetcode C++题解之第331题验证二叉树的前序序列化
  • 【多模态处理】利用GPT逐一读取本地图片并生成描述并保存,支持崩溃后从最新进度恢复