当前位置: 首页 > news >正文

flink StreamGraph 构造flink任务

文章目录

      • 背景
      • 主要步骤
      • 代码

背景

通常使用flink 提供的高级算子来编写flink 任务,对底层不是很了解,尤其是如何生成作业图的细节
下面通过构造一个有向无环图,来实际看一下

主要步骤

1.增加source
2.增加operator
3. 增加一条边,连接source和operator
4. 增加sink
5. 增加一条边,连接operator和sink

代码

 // Step 1: Create basic configurationsConfiguration configuration = new Configuration();ExecutionConfig executionConfig = new ExecutionConfig();CheckpointConfig checkpointConfig = new CheckpointConfig();SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.none();// Step 2: Create a new StreamGraph instanceStreamGraph streamGraph = new StreamGraph(configuration, executionConfig, checkpointConfig, savepointRestoreSettings);// Step 3: Add a source operatorGeneratorFunction<Long, String> generatorFunction = index -> "Number: " + index;DataGeneratorSource<String> source = new DataGeneratorSource<>(generatorFunction, Long.MAX_VALUE, RateLimiterStrategy.perSecond(1), Types.STRING);SourceOperatorFactory<String> sourceOperatorFactory = new SourceOperatorFactory<>(source, WatermarkStrategy.noWatermarks());streamGraph.addSource(1, "sourceNode", "sourceDescription", sourceOperatorFactory, TypeInformation.of(String.class), TypeInformation.of(String.class), "sourceSlot");// Step 4: Add a map operator to transform the dataStreamMap<String, String> mapOperator = new StreamMap<>(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {return value;}});SimpleOperatorFactory<String> mapOperatorFactory = SimpleOperatorFactory.of(mapOperator);streamGraph.addOperator(2, "mapNode", "mapDescription", mapOperatorFactory, TypeInformation.of(String.class), TypeInformation.of(String.class), "mapSlot");// Step 5: Connect source and map operatorstreamGraph.addEdge(1, 2, 0);// Step 6: Add a sink operator to consume the dataStreamMap<String, String> sinkOperator = new StreamMap<>(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {System.out.println(value);return value;}});SimpleOperatorFactory<String> sinkOperatorFactory = SimpleOperatorFactory.of(sinkOperator);streamGraph.addSink(3, "sinkNode", "sinkDescription", sinkOperatorFactory, TypeInformation.of(String.class), TypeInformation.of(String.class), "sinkSlot");// Step 7: Connect map and sink operatorstreamGraph.addEdge(2, 3, 0);streamGraph.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);streamGraph.setMaxParallelism(1,1);streamGraph.setMaxParallelism(2,1);streamGraph.setMaxParallelism(3,1);streamGraph.setGlobalStreamExchangeMode(GlobalStreamExchangeMode.ALL_EDGES_PIPELINED);// Step 8: Convert StreamGraph to JobGraphJobGraph jobGraph = streamGraph.getJobGraph();// Step 9: Set up a MiniCluster for local executionMiniClusterConfiguration miniClusterConfig = new MiniClusterConfiguration.Builder().setNumTaskManagers(10).setNumSlotsPerTaskManager(10).build();MiniCluster miniCluster = new MiniCluster(miniClusterConfig);// Step 10: Start the MiniClusterminiCluster.start();// Step 11: Submit the job to the MiniClusterJobExecutionResult result = miniCluster.executeJobBlocking(jobGraph);System.out.println("Job completed with result: " + result);// Step 12: Stop the MiniClusterminiCluster.close();
http://www.lryc.cn/news/486351.html

相关文章:

  • 【51单片机】LCD1602液晶显示屏
  • 理解 HTML5 Canvas 中逻辑像素与物理像素的关系
  • 7.揭秘C语言输入输出内幕:printf与scanf的深度剖析
  • 数据分析-系统认识数据分析
  • 蓝桥杯介绍
  • 鸿蒙加载网络图片并转换成PixelMap
  • hive搭建
  • 51c扩散模型~合集1
  • 从零开始深度学习:全连接层、损失函数与梯度下降的详尽指南
  • Liebherr利勃海尔 EDI 需求分析
  • java小练习
  • go语言中的占位符有哪些
  • 基于Windows安装opus python库
  • 【设计模式】行为型模式(五):解释器模式、访问者模式、依赖注入
  • 使用nossl模式连接MySQL数据库详解
  • 【MySQL】ubantu 系统 MySQL的安装与免密码登录的配置
  • 高级 SQL 技巧讲解
  • 浅论AI大模型在电商行业的发展未来
  • 【python笔记03】《类》
  • Flutter 应用在真机上调试的流程
  • 以太坊基础知识结构详解
  • 安全见闻(完整版)
  • LeetCode100之反转链表(206)--Java
  • 牛客周赛第一题2024/11/17日
  • 麒麟Server下安装东方通TongLINK/Q
  • BERT的中文问答系统33
  • Ubuntu下的Eigen库的安装及基本使用教程
  • 【spring 】Spring Cloud Gateway 的Filter学习
  • 每秒交易数(Transactions Per Second:TPS)详细拆解
  • 【初阶数据结构与算法】链表刷题之链表分割、相交链表、环形链表1、环形链表I、环形链表II