当前位置: 首页 > news >正文

Flink on Kubernetes (flink-operator) 部署Flink

flink on k8s 官网

https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.1/docs/try-flink-kubernetes-operator/quick-start/
我的部署脚本和官网不一样,有些地方官网不够详细

部署k8s集群

注意,按照默认配置至少有两台worker

安装helm

https://helm.sh/zh/docs/intro/install/

安装flink opreator

kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.1.0/
helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator -n flink
  • 安装完成后,资源如下
[root@k8s1 flinkinstall]# helm list -n flink
NAME                            NAMESPACE       REVISION        UPDATED                                 STATUS          CHART                           APP VERSION
flink-kubernetes-operator       flink           1               2024-03-07 16:57:48.374299701 +0800 CST deployed        flink-kubernetes-operator-1.7.0 1.7.0 
[root@k8s1 flinkinstall]# kubectl get all -A
NAMESPACE              NAME                                             READY   STATUS    RESTARTS   AGE
cert-manager           pod/cert-manager-66b646d76-gkw55                 1/1     Running   1          2d2h
cert-manager           pod/cert-manager-cainjector-59dc9659c7-pkgrm     1/1     Running   1          2d2h
cert-manager           pod/cert-manager-webhook-7f7787f7fd-wd5vv        1/1     Running   1          2d2h
flink                  pod/flink-kubernetes-operator-857d48ff65-45mg2   2/2     Running   6          5d20hNAMESPACE              NAME                                     TYPE        CLUSTER-IP        EXTERNAL-IP   PORT(S)                  AGE
cert-manager           service/cert-manager                     ClusterIP   192.178.138.19    <none>        9402/TCP                 2d2h
cert-manager           service/cert-manager-webhook             ClusterIP   192.178.130.219   <none>        443/TCP                  2d2h
flink                  service/flink-operator-webhook-service   ClusterIP   192.178.139.67    <none>        443/TCP                  5d20hNAMESPACE              NAME                                        READY   UP-TO-DATE   AVAILABLE   AGE
cert-manager           deployment.apps/cert-manager                1/1     1            1           2d2h
cert-manager           deployment.apps/cert-manager-cainjector     1/1     1            1           2d2h
cert-manager           deployment.apps/cert-manager-webhook        1/1     1            1           2d2h
flink                  deployment.apps/flink-kubernetes-operator   1/1     1            1           5d20hNAMESPACE              NAME                                                   DESIRED   CURRENT   READY   AGE
cert-manager           replicaset.apps/cert-manager-66b646d76                 1         1         1       2d2h
cert-manager           replicaset.apps/cert-manager-cainjector-59dc9659c7     1         1         1       2d2h
cert-manager           replicaset.apps/cert-manager-webhook-7f7787f7fd        1         1         1       2d2h
flink                  replicaset.apps/flink-kubernetes-operator-857d48ff65   1         1         1       5d20h
  • 此时k8s集群就可以支持我们按照flink-opreator的指定格式提交flink任务了

提交flink任务

session模式与application模式区别在于资源隔离度

  • session模式: jobmanager预先启动,随时准备接收flink jar,启动taskmanager,flink任务结束后jobmanager不退出,所有flink任务共享同一个jobmanager,资源隔离差,某个flink任务导致jobmanager异常,会影响到其他flink任务,小任务,不在乎异常情况可以用
  • application模式:每次提交flink任务才会启动一个jobmanger,flink任务结束后,jobmanager也退出,隔离效果好,生产常用
  • per-job模式:这个模式与application模式类似, 区别在于client的运行位置,但是新版的flink已经删除了这种提交方式

这里是flink on yarn的运行模式
https://blog.csdn.net/java_creatMylief/article/details/126172793

application模式

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:name: pod-template-example
spec:image: flink:1.15flinkVersion: v1_15flinkConfiguration:taskmanager.numberOfTaskSlots: "2"serviceAccount: flinkpodTemplate:apiVersion: v1kind: Podmetadata:name: pod-templatespec:serviceAccount: flinkcontainers:# Do not change the main container name- name: flink-main-containerenv:volumeMounts:- mountPath: /flink-logsname: flink-logsinitContainers:- name: init-nginximage: busyboxcommand: [ 'sh','-c','wget http://192.168.33.2/phoenix-client-1.0-SNAPSHOT-jar-with-dependencies.jar -O /flink-logs/StateMachineExample1.jar' ]volumeMounts:- mountPath: /flink-logsname: flink-logsvolumes:- name: flink-logsemptyDir: { }jobManager:resource:memory: "1024m"cpu: 1taskManager:resource:memory: "1024m"cpu: 1job:jarURI: local:///flink-logs/StateMachineExample1.jarparallelism: 1entryClass: org.examplexxx.testargs: [/path/from/data,/path/to/data]initialSavepointPath: hdfs://flink/ckpath/xxxxxkubectl apply -f ${name}.yamlkubectl port-forward svc/basic-example-rest 8081 --address 192.168.33.81访问 http://192.168.33.81:8081

jarURI: local:///flink-logs/StateMachineExample1.jar
此处jarURL只得是docker内部路径,且不支持远程路径(http/s3/hdfs),因此需要将jar包放到docker内部。

1、可以将flink版本和jar包打到一个镜像中。
2、可以使用pvc挂载进去。
3、使用initContainers和 containers使用相同的挂载路径,然后使用远程文件下载放到挂载路径中,containers就能获取到该jar包

此处使用第三种情况,使用initContainers变相支持远程文件地址,使用起来比较方便。

yarn-application 对比

yarn-applicationk8s-application
-p (并行度)spec.job.parallelism
-yjm (jobmanager内存)spec.jobManager.resource.memory
-ytm (taskmanager内存)spec.taskManager.resource.memory
-ys (taskmanger的slot槽数)spec.flinkConfiguration.taskmanager.numberOfTaskSlots
-c (主类)spec.job.entryClass
jar (jar包)spec.job.jarURI
-s (恢复点启动)spec.job.initialSavepointPath

session模式

部署session cluster

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:namespace: flinkname: session-deployment-only
spec:image: flink:1.13.6flinkVersion: v1_13imagePullPolicy: IfNotPresent # 镜像拉去策略,优先本地,没有,仓库拉去ingress:template: "flink.k8s.io/{{namespace)}/{{name}}(/|$)(.*)"className: "Nginx"annotations:taskmanager.numberOfTaskSlots: "2"serviceAccount: flinkjobManager:replicas: 1resource:memory: "1024m"cpu: 1taskManager:replicas: 1resource:memory: "1024m"cpu: 1kubectl apply -f ${name}.yaml

在这里插入图片描述
部署cluster完成,配置svcType 后即可访问,flink web ui,此时jobManager是启动着的 taskmanager随着flink jar进行启动和停止

部署flink jar

apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:namespace: flinkname: session-job-onlyjob:jarUrl: sasaentryClass: aaparallelism: 1upgradeMode: statelesskubectl apply -f ${name}.yaml
http://www.lryc.cn/news/328387.html

相关文章:

  • 代码随想录算法训练营第三十二天|122.买卖股票的最佳时机II、55. 跳跃游戏、45.跳跃游戏II
  • 常见数据库分类介绍及其适用场景
  • 周末总结(2024/03/30)
  • (75)爬楼梯
  • ttkbootstrap界面美化系列之Notebook(四)
  • MySQL8存储过程整合springboot
  • Acwing 1238.日志统计 双指针
  • Matlab-R2022b-安装文件分享
  • Flutter开发之objectbox
  • AI Drug Discovery Design(学习路线)
  • 【软考】设计模式之状态模式
  • MNN介绍、安装与编译:移动端深度学习推理引擎
  • A Simple Problem with Integers(线段树)
  • 单元测试(UT)用例简介
  • Java通过反射机制获取类对象下的属性值
  • IDEA插件开发-File -> New->Project中添加一个myOptions
  • 海量数据处理项目-账号微服务和流量包数据库表+索引规范(下)
  • Nodejs 16与 gitbook搭建属于你自己的书本网站-第一篇
  • 服务器被CC攻击之后怎么办?
  • pygame通过重心坐标 用纹理填充三角形
  • Leetcode 611. 有效三角形的个数
  • Openfeign
  • 五、基于KubeAdm搭建多节点K8S集群
  • PC电脑技巧[笔记本通过网线访问设备CMW500]
  • 【接口自动化测试框架】YAML管理接口框架配置的最佳实践
  • 【进程OI】基本文件操作的系统调用
  • Ubuntu20.04 server系统部署安装(VMware上)和初始化配置
  • 图论最短路径以及floyd算法的MATLAB实现
  • 微信小程序 - 登录功能实现
  • Python连接MySQL