当前位置: 首页 > news >正文

4.1、Flink任务怎样读取集合中的数据

1、API说明

非并行数据源:
        def fromElements[T: TypeInformation](data: T*): DataStream[T]
        def fromCollection[T: TypeInformation](data: Seq[T]): DataStream[T] 
        def fromCollection[T: TypeInformation] (data: Iterator[T]): DataStream[T] 

并行数据源:
        def fromParallelCollection[T: TypeInformation] (data: SplittableIterator[T])

使用场景:

        常用来调试代码使用


2、这是一个完整的入门案例

开发语言:Java1.8

Flink版本:flink1.17.0

package com.baidu.datastream.source;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.NumberSequenceIterator;import java.util.Arrays;
import java.util.List;// --------------------------------------------------------------------------------------------
//  TODO 从集合中读取数据
// --------------------------------------------------------------------------------------------/**  TODO 通过`读取Java集合中数据`来创建 DataStreamSource**  方法1:fromCollection*        Collection、Iterator -> DataStreamSource*  方法2:fromElements*        OUT... data -> DataStreamSource*  方法3:fromParallelCollection*        SplittableIterator -> DataStreamSource*  重要提示:*       fromCollection、fromElements 创建的是非并行source算子(并行度只能为1)*       fromParallelCollection 创建的是并行算子(并行度>=1)* */public class ReadCollection {public static void main(String[] args) throws Exception {fromCollection();//fromElements();//fromParallelCollection();}public static void fromCollection() throws Exception {// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);// 2.读取Java集合数据List<String> list = Arrays.asList("刘备", "张飞", "关羽", "赵云", "马超", "黄忠");env.fromCollection(list).print();// 3.触发程序执行env.execute();}public static void fromElements() throws Exception {// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);// 2.读取给定的对象序列env.fromElements("刘备", "张飞", "关羽", "赵云", "马超", "黄忠").print();// 3.触发程序执行env.execute();}public static void fromParallelCollection() throws Exception {// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);// 2.读取给定的对象序列NumberSequenceIterator numberSequenceIterator = new NumberSequenceIterator(1, 10);env.fromParallelCollection(numberSequenceIterator, Long.class).print();/** 注意: fromParallelCollection生成的source为并行算子*       集合中的数据会被平均分配到并行子任务中去* */// 3.触发程序执行env.execute();}
}

http://www.lryc.cn/news/117098.html

相关文章:

  • JD商品详情页面+关键词搜索商品列表API接口数据,详情页面数据返回值说明
  • Electron v26.0.0-beta.11 发布,跨平台桌面应用开发工具
  • 提高办案效率:公检系统引入自动校对技术
  • iptables 清空
  • 网络安全(黑客)零基础入门
  • Al Go: 蒙特卡洛树搜索(MCTS)简介
  • Client-go操作Deployment
  • 设计模式——单例模式(懒汉和饿汉)
  • 详解——Vue3递归函数功能
  • 【VSCode】查看二进制文件
  • C#设计模式之观察者模式
  • 小红书攻略:爆款引流,如何在激烈竞争中脱颖而出?
  • Ubuntu中的安装卸载及删除方法
  • 获取历史dokcer镜像项目,并上传gitlab,再打包镜像管理
  • 【Go语言】Golang保姆级入门教程 Go初学者chapter3
  • 网络防御(4)
  • conda错误处理:ResolvePackageNotFound
  • linux初学者小命令
  • 宝尊电商短期前景堪忧,宝尊国际能否取得成功还有待验证
  • 百川智能发布首个530亿参数闭源大模型,今年追上GPT-3.5
  • Redis的常用数据结构
  • 深入JVM - JIT分层编译技术与日志详解
  • 临时文档2
  • [深度学习入门]PyTorch深度学习[数组变形、批量处理、通用函数、广播机制]
  • 男孩向妈妈发脾气爸爸言传身教
  • uniapp实现自定义导航内容高度居中(兼容APP端以及小程序端与胶囊对齐)
  • Python调用外部电商API的详细步骤
  • 什么是NVME
  • 交叉编译驱动和应用出现警告提示错误“cc1:all warnings being treated as errors”解决方法
  • 基于nodejs+vue+uniapp微信小程序的短视频分享系统