当前位置: 首页 > news >正文

Flink提交pyflink任务

1.官方文档:
flink1.14:https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/cli/#submitting-pyflink-jobs
flink1.18:https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/cli/#submitting-pyflink-jobs

2.提交PyFlink作业 - Submitting PyFlink Jobs #

(1)环境检查

Currently, users are able to submit a PyFlink job via the CLI. It does not require to specify the JAR file path or the entry main class, which is different from the Java job submission.
官方翻译:当前用户可以通过命令行提交PyFlink作业。不要指定 jar 文件路径或者主类入口,跟Java作业提交不一样。

When submitting Python job via flink run, Flink will run the command “python”. Please run the following command to confirm that the python executable in current environment points to a supported Python version of 3.6+.
官方翻译:当使用 flink run 提交Python作业时,Flink会运行命令 python。请运行下面的命令确保Python可执行程序在当前环境中,并指向Python 3.6+ 版本。

$ python --version

the version printed here must be 3.6+

(2)运行PyFlink作业 - Run a PyFlink job

The following commands show different PyFlink job submission use-cases:
官方翻译:后续的命令展示了不同的PyFlink作业提交用例:

示例1:$ ./bin/flink run --python examples/python/table/word_count.py

Run a PyFlink job with additional source and resource files. Files specified in --pyFiles will be added to the PYTHONPATH and, therefore, available in the Python code.
官方翻译:使用额外的源和资源文件运行PyFlink作业。在 --pyFiles 指定的文件都会被加入到 PYTHONPATH 中,因此就在python代码中可用。

示例2:$ ./bin/flink run \--python examples/python/table/word_count.py \--pyFiles file:///user.txt,hdfs:///$namenode_address/username.txt

Run a PyFlink job which will reference Java UDF or external connectors. JAR file specified in --jarfile will be uploaded to the cluster.
官方翻译:运行引用了Java自定义函数或者外部连接器的PyFlink作业。在 --jarfile 后指定的 jar 文件将会被上传到集群。

示例3:$ ./bin/flink run \--python examples/python/table/word_count.py \--jarfile <jarFile>

Run a PyFlink job with pyFiles and the main entry module specified in --pyModule:
官方翻译:使用 pyFiles 选项运行PyFlink作业需要使用 --pyModule 参数指定主模块入口:

示例4:$ ./bin/flink run \--pyModule table.word_count \--pyFiles examples/python/table

Submit a PyFlink job on a specific JobManager running on host (adapt the command accordingly):
官方翻译:将PyFlink作业提交到指定的 JVM 上运行:

示例5:$ ./bin/flink run \--jobmanager <jobmanagerHost>:8081 \--python examples/python/table/word_count.py

Run a PyFlink job using a YARN cluster in Per-Job Mode:
官方翻译:使用以每作业模式的 YARN 集群运行PyFlink作业:

示例6:$ ./bin/flink run \--target yarn-per-job--python examples/python/table/word_count.py

Run a PyFlink application on a native Kubernetes cluster having the cluster ID , it requires a docker image with PyFlink installed, please refer to Enabling PyFlink in docker:
官方翻译:在指定集群标识的 Kubernetes 原生集群上运行PyFlink应用,需要一个PyFlink的容器镜像,请参考在容器里启用PyFlink:

示例7:$ ./bin/flink run-application \--target kubernetes-application \--parallelism 8 \-Dkubernetes.cluster-id=<ClusterId> \-Dtaskmanager.memory.process.size=4096m \-Dkubernetes.taskmanager.cpu=2 \-Dtaskmanager.numberOfTaskSlots=4 \-Dkubernetes.container.image=<PyFlinkImageName> \--pyModule word_count \--pyFiles /opt/flink/examples/python/table/word_count.py
http://www.lryc.cn/news/538134.html

相关文章:

  • 对称算法模式之CTR
  • Map 和 Set
  • STOMP协议
  • 手动埋点的demo
  • 大模型开发实战篇5:多模态--文生图模型API
  • 【大模型】DeepSeek 高级提示词技巧使用详解
  • 【第14章:神经符号集成与可解释AI—14.2 可解释AI技术:LIME、SHAP等的实现与应用案例】
  • Python中使用Minio实现图像或视频文件的存储
  • Kubernetes-master 组件
  • 人形机器人 - 仿生机器人核心技术与大小脑
  • OpenAI 快速入门
  • nginx 实战配置
  • WebMvcConfigurer 介绍
  • java05(类、泛型、JVM、线程)---java八股
  • Python+appium实现自动化测试
  • Unity中如何判断URL是否为RTSP或RTMP流
  • 基于角色访问控制的UML 表示02
  • 【函数题】6-10 二分查找
  • 关于conda换镜像源,pip换源
  • DeepSeek与ChatGPT的全面对比
  • Spring AI发布!让Java紧跟AI赛道!
  • 基于CT107D单片机综合训练平台的秒表设计
  • opensuse [Linux] 系统挂在新的机械硬盘
  • 时间序列分析(四)——差分运算、延迟算子、AR(p)模型
  • 【CUDA】Triton
  • Windows环境搭建ES集群
  • langchain学习笔记之消息存储在内存中的实现方法
  • 怎么在智能合约中植入deepseek
  • 驱动开发系列37 - Linux Graphics 2D 绘制流程(二)- 画布创建和窗口关联
  • B. Longest Divisors Interval