当前位置: 首页 > news >正文

flink Data Source数据源

flink

Data Source数据源

Source

  • 并行度

    • 非并行:并行度只能为1

    • 并行

  • 基于集合的Source

    • fromElements

      • package com.pxj.sx.flink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class FromElementDemo {public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();configuration.setInteger(RestOptions.PORT, 8081);StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> daat = env.fromElements("flink", "spark", "hive");daat.print();Thread.sleep(2000000);}
}
	- fromElements(T ...) 方法是一个非并行的Source,可以将一到多个数据作为可变参数传入到该方法中,返回DataStreamSource。该方法返回的DataStream是一个有限数据流,数据读完后,程序退出,通常用于开发测试。-  fromCollection- fromCollection可以从一个结合读取数据,返回DataStream,该方法返回的DataStream是一个有限数据流,数据读完后,程序退出,通常用于开发测试。- package com.pxj.sx.flink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.Arrays;
import java.util.List;public class FromCollectionDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();List<String> wordList = Arrays.asList("flink", "spark", "hadoop", "flink");DataStreamSource<String> source = env.fromCollection(wordList);source.print();env.execute("pxj");}
}
- fromParallelCollection- fromParallelCollection(SplittableIterator, Class) 方法是一个并行的Source(并行度可以使用env的setParallelism来设置),该方法需要传入两个参数,第一个是继承SplittableIterator的实现类的迭代器,第二个是迭代器中数据的类型。该方法返回的DataStream是一个有限数据流,数据读完后,程序退出,通常用于开发测试。- package com.pxj.sx.flink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.LongValueSequenceIterator;public class FromParallelCollectionDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//Source是多个并行的DataStreamSource<LongValue> dataSource = env.fromParallelCollection(new LongValueSequenceIterator(1, 10), LongValue.class);dataSource.print();env.execute("pxj");}
}
- generateSequence- generateSequence(long from, long to) 方法是一个并行的Source(并行度也可以通过调用该方法后,再调用setParallelism来设置)该方法需要传入两个long类型的参数,第一个是起始值,第二个是结束值,返回一个DataStreamSource。该方法返回的DataStream是一个有限数据流,数据读完后,程序退出,通常用于开发测试。
  • 基于Socket网络端口

    • socketTextStream(String hostname, int port) 方法是一个非并行的Source,该方法需要传入两个参数,第一个是指定的IP地址或主机名,第二个是端口号,即从指定的Socket读取数据创建DataStream。该方法还有多个重载的方法,其中一个是socketTextStream(String hostname, int port, String delimiter, long maxRetry),这个重载的方法可以指定行分隔符和最大重新连接次数。这两个参数,默认行分隔符是"\n",最大重新连接次数为0。

      • package com.pxj.sx.flink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SocktDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.socketTextStream("pxj62", 8889);source.print();env.execute("pxj");}
}
  • 基于文件

    • readFile

      • package com.pxj.sx.flink;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;public class ReadFlie {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.readFile(new TextInputFormat(null), "data/a.txt",FileProcessingMode.PROCESS_CONTINUOUSLY, 2000).print();env.execute("pxj");}
}
- readTextFile- package com.pxj.sx.flink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class ReadFlieDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.readTextFile("data/a.txt");SingleOutputStreamOperator<Tuple2<String, Integer>> datas = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] strings = value.split(",");for (String s : strings) {out.collect(Tuple2.of(s, 1));}}});SingleOutputStreamOperator<Tuple2<String, Integer>> summed = datas.keyBy(0).sum(1);summed.print();env.execute("pxj");}
}
  • 自定义Source

    • 单并行度

      • 可以实现 SourceFunction 或者 RichSourceFunction , 这两者都是非并行的source算子

        • package com.pxj.sx.flink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;public class MySource2{public static void main(String[] args)throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.addSource(new MySource3());source.print();env.execute("pxj");}
}
class  MySource3 extends RichSourceFunction<String> {private int i=0; //定义一个int类型的变量,从1开始private boolean flag=true;  //定义一个flag标标志//run方法就是用来读取外部的数据或产生数据的逻辑@Overridepublic void run(SourceContext<String> ctx) throws Exception {while (i<=100 && flag){Thread.sleep(1000); //为避免太快,睡眠1秒ctx.collect("data:"+i++);}}@Overridepublic void cancel() {flag=false;}
}
- 多并行度-    也可继承   ParallelSourceFunction  或者 RichParallelSourceFunction , 这两者都是可并行的source算子- 带 Rich的,都拥有 open() ,close() ,getRuntimeContext() 方法

带 Parallel的,都可多实例并行执行source

				- package com.pxj.sx.flink;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;public class MySource1{public static void main(String[] args)throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.addSource(new MySource());source.print();env.execute("pxj");}
}
class  MySource extends RichParallelSourceFunction<String> {private int i=0; //定义一个int类型的变量,从1开始private boolean flag=true;  //定义一个flag标标志//run方法就是用来读取外部的数据或产生数据的逻辑@Overridepublic void run(SourceContext<String> ctx) throws Exception {while (i<=100 && flag){Thread.sleep(1000); //为避免太快,睡眠1秒ctx.collect("data:"+i++);}}@Overridepublic void cancel() {flag=false;}
}

整理人:pxj_sx(潘陈)
日 期:2024-05-26 11:47:24

http://www.lryc.cn/news/353647.html

相关文章:

  • 网络七层模型与云计算中的网络服务
  • word一按空格就换行怎么办?word文本之间添加空格就换行怎么办?
  • Python 遍历字典的方法,你都掌握了吗
  • MySQL 8.4.0 LTS 变更解析:I_S 表、权限、关键字和客户端
  • LeetCode 124 —— 二叉树中的最大路径和
  • 美甲店会员预约系统管理小程序的作用是什么
  • ..堆..
  • 【LLM多模态】综述Visual Instruction Tuning towards General-Purpose Multimodal Model
  • 探索Linux中的神奇工具:重定向符的妙用
  • Kubernetes 文档 / 概念 / 工作负载 / 工作负载管理 / Job
  • 办公自动化-Python如何提取Word标题并保存到Excel中?
  • 基于Java、SpringBoot和uniapp在线考试系统安卓APP和微信小程序
  • 抖音a-bogus加密解析(三)
  • IS-IS DIS
  • random和range
  • 研二学妹面试字节,竟倒在了ThreadLocal上,这是不要应届生还是不要女生啊?
  • Golang:gammazero/deque是一个快速环形缓冲区deque(双端队列)实现
  • C++ 时间处理-统计函数运行时间
  • JAVA面试题大全(十五)
  • 使用python对指定文件夹下的pdf文件进行合并
  • Day50 | 309.最佳买卖股票时机含冷冻期 714.买卖股票的最佳时机含手续费 总结
  • Steam在连接至服务器发生错误/连接服务器遇到问题解决办法
  • kafka 工作流程文件存储
  • 贪心算法4(c++)
  • 【无标题】yoloV8目标检测与实例分割--目标检测onnx模型部署
  • 深入理解与防御跨站脚本攻击(XSS):从搭建实验环境到实战演练的全面教程
  • 初步认识栈和队列
  • 插件:NGUI
  • 网络爬虫原理及其应用
  • 串口中断原理及实现