当前位置: 首页 > news >正文

Flink CDC系列之:理解学习YARN模式

Flink CDC系列之:理解学习YARN模式

  • 准备
  • 会话模式
  • 在 YARN 上启动 Flink 会话
  • 设置 Flink CDC
  • 提交 Flink CDC Job

Apache Hadoop YARN 是许多数据处理框架中流行的资源提供者。Flink 服务提交给 YARN 的 ResourceManager,后者在由 YARN NodeManagers 管理的机器上生成容器。Flink 将其 JobManager 和 TaskManager 实例部署到此类容器中。

Flink 可以根据在 JobManager 上运行的作业所需的处理槽数量动态分配和取消分配 TaskManager 资源。

准备

本入门部分假设从版本 2.10.2 开始有一个可运行的 YARN 环境。最方便的方法是使用 Amazon EMR、Google Cloud DataProc 或 Cloudera 等产品等服务来提供 YARN 环境。不建议在本地或集群上手动设置 YARN 环境以完成本入门教程。

  • 通过运行 yarn top 确保您的 YARN 集群已准备好接受 Flink 应用程序。它应该不会显示任何错误消息。
  • 从下载页面下载最新的 Flink 发行版并解压。
  • 重要提示确保已设置 HADOOP_CLASSPATH 环境变量(可以通过运行 echo $HADOOP_CLASSPATH 进行检查)。如果没有,请使用以下命令进行设置。
export HADOOP_CLASSPATH=`hadoop classpath`

会话模式

Flink 可在所有类 UNIX 环境中运行,即 Linux、Mac OS X 和 Cygwin(适用于 Windows)。

可以参考概述检查支持的版本并下载 Flink 的二进制版本,然后提取存档:

tar -xzf flink-*.tgz

应该设置 FLINK_HOME 环境变量,例如:

export FLINK_HOME=/path/flink-*

在 YARN 上启动 Flink 会话

一旦确保已设置 HADOOP_CLASSPATH 环境变量,即可在 YARN 会话上启动 Flink:

# we assume to be in the root directory of 
# the unzipped Flink distribution# export HADOOP_CLASSPATH
export HADOOP_CLASSPATH=`hadoop classpath`# Start YARN session
./bin/yarn-session.sh --detached# Stop YARN session (replace the application id based 
# on the output of the yarn-session.sh command)
echo "stop" | ./bin/yarn-session.sh -id application_XXXXX_XXX

启动 YARN 会话后,现在可以通过命令输出最后几行中打印的 URL 或通过 YARN ResourceManager Web UI 访问 Flink Web UI。

然后,需要向 flink-conf.yaml 添加一些配置:

rest.bind-port: {{REST_PORT}}
rest.address: {{NODE_IP}}
execution.target: yarn-session
yarn.application.id: {{YARN_APPLICATION_ID}}

{{REST_PORT}} 和 {{NODE_IP}} 应替换为 JobManager Web 界面的实际值,{{YARN_APPLICATION_ID}} 应替换为 Flink 的实际 YARN 应用程序 ID。

设置 Flink CDC

从发布页面下载 Flink CDC 的 tar 文件,然后提取存档:

tar -xzf flink-cdc-*.tar.gz

解压后的 flink-cdc 包含四个目录:bin、lib、log 和 conf。

从发布页面下载连接器 jar,并将其移动到 lib 目录。

下载链接仅适用于稳定版本,SNAPSHOT 依赖项需要您根据特定分支自行构建。

提交 Flink CDC Job

下面是同步整个数据库的示例文件mysql-to-doris.yaml:

################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:type: mysqlhostname: localhostport: 3306username: rootpassword: 123456tables: app_db.\.*server-id: 5400-5404server-time-zone: UTCsink:type: dorisfenodes: 127.0.0.1:8030username: rootpassword: ""pipeline:name: Sync MySQL Database to Dorisparallelism: 2

需要根据自己的需求修改配置文件。最后使用Cli将作业提交到Flink Standalone集群。

cd /path/flink-cdc-*
./bin/flink-cdc.sh mysql-to-doris.yaml

提交成功后返回信息如下:

Pipeline has been submitted to cluster.
Job ID: ae30f4580f1918bebf16752d4963dc54
Job Description: Sync MySQL Database to Doris

可以通过 Flink Web UI 找到正在运行的名为 Sync MySQL Database to Doris 的作业。

请注意,目前不支持提交到应用程序模式集群和 per-job 模式集群。

http://www.lryc.cn/news/471295.html

相关文章:

  • langgraph入门
  • 【Python】爬虫程序打包成exe
  • 【力扣专题栏】两两交换链表中的节点,如何实现链表中两两相邻节点的交换?
  • 埋点采集的日志数据常见的格式简介
  • 基于SSM高考志愿辅助填报系统设计与实现
  • elasticsearch 8.x 插件安装(六)之Hanlp插件
  • 排序算法简记
  • Stable diffusion inference 多卡并行
  • Docker:namespace环境隔离 CGroup资源控制
  • 鼠标增强工具 MousePlus v5.3.9.0 中文绿色版
  • Android 圆形进度条CircleProgressView 基础版
  • 理解磁盘结构---CHS---LAB---文件系统
  • 我在1024谈华为
  • NVR小程序接入平台/设备EasyNVR多品牌NVR管理工具/设备视频监控解决方案
  • 二叉树前序遍历的 Java 实现,包括递归和非递归两种方式
  • QT开发:构建现代UI的利器:深入详解QML和Qt Quick基础开发技术
  • vue前端使用pdfjs与pdfdist-mergeofd 实现预览pdf并翻页,同时解决预览pdf显示模糊的问题
  • C语言——回调函数
  • 2016年ATom-1飞行活动期间以10秒间隔进行的一氧化碳(CO)观测数据
  • MLM之Emu3:Emu3(仅需下一个Token预测)的简介、安装和使用方法、案例应用之详细攻略
  • Spring Boot与Flyway实现自动化数据库版本控制
  • input角度:I2C触摸屏驱动分析和编写一个简单的I2C驱动程序
  • SQL-lab靶场less1-4
  • 【生成模型之二】diffusion model模型
  • 记录 Maven 版本覆盖 Bug 的解决过程
  • 【K8S系列】Kubernetes Service 基础知识 详细介绍
  • python在物联网领域的数据应用分析与实战!
  • 目标跟踪算法-卡尔曼滤波详解
  • SpringBoot后端开发常用工具详细介绍——application多环境配置与切换
  • php反序列化漏洞典型例题