当前位置: 首页 > news >正文

Flink CDC系列之:理解学习Kubernetes模式

Flink CDC系列之:理解学习Kubernetes模式

  • 准备
  • 会话模式
  • 启动会话集群
  • 设置 Flink CDC
  • 提交 Flink CDC Job

Kubernetes 是一种流行的容器编排系统,用于自动化计算机应用程序的部署、扩展和管理。Flink 的原生 Kubernetes 集成允许您直接在正在运行的 Kubernetes 集群上部署 Flink。此外,由于 Flink 可以直接与 Kubernetes 通信,因此它能够根据所需资源动态分配和取消分配 TaskManager。

Apache Flink 还提供了一个 Kubernetes 运算符,用于管理 Kubernetes 上的 Flink 集群。它支持独立和原生部署模式,并大大简化了 Kubernetes 上 Flink 资源的部署、配置和生命周期管理。

准备

本文档假设正在运行的 Kubernetes 集群满足以下要求:

  • Kubernetes >= 1.9。
  • KubeConfig,有权列出、创建、删除 pod 和服务,可通过 ~/.kube/config 进行配置。可以通过运行 kubectl auth can-i <list|create|edit|delete> pods 来验证权限。
  • 启用 Kubernetes DNS。
  • 具有 RBAC 权限的默认服务帐户可以创建、删除 pod。

会话模式

Flink 可在所有类 UNIX 环境中运行,即 Linux、Mac OS X 和 Cygwin(适用于 Windows)。

可以参考概述来检查支持的版本并下载 Flink 的二进制版本,然后提取存档:

tar -xzf flink-*.tgz

应该设置 FLINK_HOME 环境变量,例如:

export FLINK_HOME=/path/flink-*

启动会话集群

要在 k8s 上启动会话集群,请运行 Flink 附带的 bash 脚本:

cd /path/flink-*
./bin/kubernetes-session.sh -Dkubernetes.cluster-id=my-first-flink-cluster

启动成功后返回信息如下:

org.apache.flink.kubernetes.utils.KubernetesUtils            [] - Kubernetes deployment requires a fixed port. Configuration blob.server.port will be set to 6124
org.apache.flink.kubernetes.utils.KubernetesUtils            [] - Kubernetes deployment requires a fixed port. Configuration taskmanager.rpc.port will be set to 6122
org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Please note that Flink client operations(e.g. cancel, list, stop, savepoint, etc.) won't work from outside the Kubernetes cluster since 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Create flink session cluster my-first-flink-cluster successfully, JobManager Web Interface: http://my-first-flink-cluster-rest.default:8081

然后,需要将这两个配置添加到您的 flink-conf.yaml 中:

rest.bind-port: {{REST_PORT}}
rest.address: {{NODE_IP}}

{{REST_PORT}} 和 {{NODE_IP}} 应该被你的 JobManager Web 界面的实际值替换。

设置 Flink CDC

从发布页面下载 Flink CDC 的 tar 文件,然后提取存档:

tar -xzf flink-cdc-*.tar.gz

解压后的 flink-cdc 包含四个目录:bin、lib、log 和 conf。

从发布页面下载连接器 jar,并将其移动到 lib 目录。

下载链接仅适用于稳定版本,SNAPSHOT 依赖项需要您根据特定分支自行构建。

提交 Flink CDC Job

下面是同步整个数据库的示例文件mysql-to-doris.yaml:

################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:type: mysqlhostname: localhostport: 3306username: rootpassword: 123456tables: app_db.\.*server-id: 5400-5404server-time-zone: UTCsink:type: dorisfenodes: 127.0.0.1:8030username: rootpassword: ""pipeline:name: Sync MySQL Database to Dorisparallelism: 2

需要根据自己的需求修改配置文件。最后使用Cli将作业提交到Flink Standalone集群。

cd /path/flink-cdc-*
./bin/flink-cdc.sh mysql-to-doris.yaml

提交成功后返回信息如下:

Pipeline has been submitted to cluster.
Job ID: ae30f4580f1918bebf16752d4963dc54
Job Description: Sync MySQL Database to Doris

然后你就可以通过 Flink Web UI 找到正在运行的名为 Sync MySQL Database to Doris 的作业。

http://www.lryc.cn/news/471757.html

相关文章:

  • git合并相关操作详解
  • 前端经典【面试题】持续更新HTML、CSS、JS、VUE、FLUTTER、性能优化等
  • 【Linux知识】linux磁盘管理深入了解
  • Qt Essential Classes
  • 小小猫棒onu替换家用光猫,薅运营商带宽羊毛,突破1000M
  • 软件测试学习笔记丨Selenium学习笔记:css定位
  • python数据处理常用操作
  • 解决minio跨域问题
  • python 跳过当前循环
  • 数据库数据恢复—Oracle ASM磁盘组掉线 ,ASM实例无法挂载的数据恢复案例
  • jupyter notebook改变默认启动路径
  • libevent源码剖析-基本数据结构
  • 往期文章汇总——射频测量+无线通信+软件无线电+6G科普
  • 微信小程序 - 深 / 浅拷贝实现方法,微信小程序深拷贝与浅拷贝,函数方法封装直接调用使用,深拷贝cloneDeep和浅拷贝clone(深复制和浅复制)
  • Log4Net配置详解及输出自定义消息类示例代码
  • C++在实际项目中的应用第二节:C++与区块链
  • 浅记React面试丢人时刻
  • Python入门:学会Python装饰器让你的代码如虎添翼!(Python如何不改动原有函数代码添加一些额外的功能)
  • 【C++】哈希冲突的解决办法:闭散列 与 开散列
  • 复刻系列-原神 5.1 版本先行展示页
  • STM32 第3章 如何用串口下载程序
  • HT71782 20V,15A全集成同步升压转换器
  • [含文档+PPT+源码等]精品基于PHP实现的培训机构信息管理系统的设计与实现
  • 亚信安全DeepSecurity中标知名寿险机构云主机安全项目
  • 论文解析八: GAN:Generative Adversarial Nets(生成对抗网络)
  • 【ARM】ARM架构参考手册_Part B 内存和系统架构(2)
  • HttpServer模块 --- 封装TcpServer支持Http协议
  • 蓝牙资讯|iOS 18.1 正式版下周推送,AirPods Pro 2耳机将带来助听器功能
  • C语言之环形缓冲区概述及实现
  • C++Socket通讯样例(服务端)