当前位置: 首页 > news >正文

Flink DataStream API详解

DataStream API

参考:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/datastream_api.html

Data Sources

Source是程序读取其输入的位置,您可以使用env.addSource(sourceFunction)将Source附加到程序中。Flink内置了许多预先实现的SourceFunction,但是您始终可以通过实现SourceFunction(non-parallel sources)来编写自定义Source,或者通过继承RichParallelSourceFunction或实现ParallelSourceFunction接口来实现并行Source.

File-based

readTextFile(path) - 逐行读取文本文件,底层使用TextInputFormat规范读取文件,并将其作为字符串返回

val env = StreamExecutionEnvironment.getExecutionEnvironment
val lines:DataStream[String]=env.readTextFile("file:///E:\\demo\\words")
lines.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(t=>t._1)
.sum(1)
.print()
env.execute("wordcount")

readFile(fileInputFormat, path) - 根据指定的文件输入格式读取文件(仅仅读取一次,类似批处理)

val env = StreamExecutionEnvironment.getExecutionEnvironment
val inputFormat=new TextInputFormat(null)
val lines:DataStream[String]=env.readFile(inputFormat,"file:///E:\\demo\\words")
lines.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(t=>t._1)
.sum(1)
.print()
env.execute("wordcount")

readFile(fileInputFormat, path, watchType, interval, pathFilter) - 这是前两个内部调用的方法。它根据给定的FileInputFormat读取指定路径下的文件,可以根据watchType定期检测指定路径下的文件,其中watchType的可选值为FileProcessingMode.PROCESS_CONTINUOUSLY或者FileProcessingMode.PROCESS_ONCE,检查的周期由interval参数决定。用户可以使用pathFilter参数排除该路径下需要排除的文件。如果指定watchType的值被设置为PROCESS_CONTINUOUSLY,表示一旦文件内容发生改变,整个文件内容会被重复处理。

val env = StreamExecutionEnvironment.getExecutionEnvironment
val inputFormat=new TextInputFormat(null)
val lines:DataStream[String]=env.readFile(inputFormat,"file:///E:\\demo\\words",FileProcessingMode.PROCESS_CONTINUOUSLY,5000,new FilePathFilter {override def filterPath(filePath: Path): Boolean = {filePath.getPath.endsWith(".txt")}})
lines.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(t=>t._1)
.sum(1)
.print()
env.execute("wordcount")

Socket-based

val env = StreamExecutionEnvironment.getExecutionEnvironment
val lines:DataStream[String]=env.socketTextStream("centos",9999)
lines.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(t=>t._1)
.sum(1)
.print()
env.execute("wordcount")

Collection-based(测试)

val env = StreamExecutionEnvironment.getExecutionEnvironment
val lines:DataStream[String]=env.fromCollection(List("this is a demo","good good"))
//val lines:DataStream[String]=env.fromElements("this is a demo","good good")
lines.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(t=>t._1)
.sum(1)
.print()
env.execute("wordcount")

Custom Source

import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction}
import scala.util.Randomclass CustomSourceFunction extends ParallelSourceFunction[String]{@volatilevar isRunning:Boolean = trueval lines:Array[String] = Array("this is a demo","hello word","are you ok")override def run(ctx: SourceFunction.SourceContext[String]): Unit = {while(isRunning){Thread.sleep(1000)ctx.collect(lines(new Random().nextInt(lines.length)))//将数据输出给下游}}override def cancel(): Unit = {isRunning=false}
}
val env = StreamExecutionEnvironment.getExecutionEnvironment
val lines:DataStream[String]=env.addSource[String](new CustomSourceFunction)
lines.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(t=>t._1)
.sum(1)
.print()
env.execute("wordcount")

FlinkKafkaConsumer√

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.version}</artifactId><version>${flink.version}</version>
</dependency>
val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("bootstrap.servers", "centos:9092")
props.setProperty("group.id", "g1")
val lines=env.addSource(new FlinkKafkaConsumer("topic01",new SimpleStringSchema(),props))lines.flatMap(_.split("\\s+")).map((_,1)).keyBy(t=>t._1).sum(1).print()
env.execute("wordcount")

如果使用SimpleStringSchema,仅仅能获取value,如果用户希望获取更多信息,比如 key/value/partition/offset ,用户可以通过继承KafkaDeserializationSchema类自定义反序列化对象。

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.flink.streaming.api.scala._class UserKafkaDeserializationSchema extends KafkaDeserializationSchema[(String,String)] {//这个方法永远返回falseoverride def isEndOfStream(nextElement: (String, String)): Boolean = {false}override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String) = {var key=""if(record.key()!=null && record.key().size!=0){key=new String(record.key())}val value=new String(record.value())(key,value)}//告诉Flink tuple元素类型override def getProducedType: TypeInformation[(String, String)] = {createTypeInformation[(String, String)]}
}
val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("bootstrap.servers", "centos:9092")
props.setProperty("group.id", "g1")
val lines:DataStream[(String,String)]=env.addSource(new FlinkKafkaConsumer("topic01",new UserKafkaDeserializationSchema(),props))
lines.map(t=>t._2).flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(t=>t._1)
.sum(1)
.print()
env.execute("wordcount")

如果Kafka存储的都是json格式的字符串数据,用户可以使用系统自带的一些支持json的Schema,推荐使用:

  • JsonNodeDeserializationSchema:要求value必须是json格式的字符串
  • JSONKeyValueDeserializationSchema(meta):要求key、value都必须是josn格式数据,同时可以携带元数据(分区、 offset等)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("bootstrap.servers", "centos:9092")
props.setProperty("group.id", "g1")
val jsonData:DataStream[ObjectNode]=env.addSource(new FlinkKafkaConsumer("topic01",new JSONKeyValueDeserializationSchema(true),props))
jsonData.map(on=> (on.get("value").get("id").asInt(),on.get("value").get("name")))
.print()
env.execute("wordcount")

Data Sinks

Data Sinks接收DataStream数据,并将其转发到指定文件,socket,外部存储系统或者print它们,Flink预定义一些输出Sink。

File-based

write*:writeAsText|writeAsCsv(…)|writeUsingOutputFormat,请注意DataStream上的write*()方法主要用于调试目的。

val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("bootstrap.servers", "centos:9092")
props.setProperty("group.id", "g1")
env.addSource(new FlinkKafkaConsumer[String]("topic01",new SimpleStringSchema(),props))
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.sum(1)
.writeAsText("file:///E:/results/text",WriteMode.OVERWRITE)
env.execute("wordcount")

以上写法只能保证at_least_once的语义处理,如果是在生产环境下,推荐使用flink-connector-filesystem将数据写到外围系统,可以保证exactly-once语义处理。

val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("bootstrap.servers", "centos:9092")
props.setProperty("group.id", "g1")
val bucketingSink = new BucketingSink[(String,Int)]("hdfs://centos:9000/BucketingSink")
bucketingSink.setBucketer(new DateTimeBucketer("yyyyMMddHH"))//文件目录
bucketingSink.setBatchSize(1024)
env.addSource(new FlinkKafkaConsumer[String]("topic01",new SimpleStringSchema(),props))
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.sum(1)
.addSink(bucketingSink)
.setParallelism(6)
env.execute("wordcount")

print() | printToErr()

val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("bootstrap.servers", "centos:9092")
props.setProperty("group.id", "g1")
fsEnv.addSource(new FlinkKafkaConsumer[String]("topic01",new SimpleStringSchema(),props))
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.sum(1)
.print("测试") //输出前缀  当有多个流输出到控制台时,可以添加前缀加以区分
.setParallelism(2)
env.execute("wordcount")

Custom Sink

class  CustomSinkFunction extends  RichSinkFunction[(String,Int)]{override def open(parameters: Configuration): Unit = {println("初始化连接")}override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit = {println(value)}override def close(): Unit = {println("关闭连接")}
}
val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("bootstrap.servers", "centos:9092")
props.setProperty("group.id", "g1")
env.addSource(new FlinkKafkaConsumer[String]("topic01",new SimpleStringSchema(),props))
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.sum(1)
.addSink(new CustomSinkFunction)
env.execute("wordcount")

RedisSink√

  • 添加
<dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version>
</dependency>
class UserRedisMapper extends RedisMapper[(String,Int)]{// 设置数据类型override def getCommandDescription: RedisCommandDescription = {new RedisCommandDescription(RedisCommand.HSET,"wordcount")}override def getKeyFromData(data: (String, Int)): String = {data._1}override def getValueFromData(data: (String, Int)): String = {data._2.toString}
}
val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("bootstrap.servers", "centos:9092")
props.setProperty("group.id", "g1")
val jedisConfig=new FlinkJedisPoolConfig.Builder()
.setHost("centos")
.setPort(6379)
.build()
env.addSource(new FlinkKafkaConsumer[String]("topic01",new SimpleStringSchema(),props))
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.sum(1)
.addSink(new RedisSink[(String, Int)](jedisConfig,new UserRedisMapper))
env.execute("wordcount")

FlinkKafkaProducer√

class UserKeyedSerializationSchema extends KeyedSerializationSchema[(String,Int)]{
Intoverride def serializeKey(element: (String, Int)): Array[Byte] = {element._1.getBytes()}override def serializeValue(element: (String, Int)): Array[Byte] = {element._2.toString.getBytes()}//可以覆盖 默认是topic,如果返回值为null,表示将数据写入到默认的topic中override def getTargetTopic(element: (String, Int)): String = {null}
}
val env = StreamExecutionEnvironment.getExecutionEnvironment
val props1 = new Properties()
props1.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos:9092")
props1.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "g1")
val props2 = new Properties()
props2.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos:9092")
props2.setProperty(ProducerConfig.BATCH_SIZE_CONFIG,"100")
props2.setProperty(ProducerConfig.LINGER_MS_CONFIG,"500")
props2.setProperty(ProducerConfig.ACKS_CONFIG,"all")
props2.setProperty(ProducerConfig.RETRIES_CONFIG,"2")
env.addSource(new FlinkKafkaConsumer[String]("topic01",new SimpleStringSchema(),props1))
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.sum(1)
.addSink(new FlinkKafkaProducer[(String, Int)]("topic02",new UserKeyedSerializationSchema,props2))
env.execute("wordcount")

DataStream Transformations

Map

Takes one element and produces one element.

dataStream.map { x => x * 2 }

FlatMap

Takes one element and produces zero, one, or more elements.

dataStream.flatMap { str => str.split(" ") }

Filter

Evaluates a boolean function for each element and retains those for which the function returns true.

dataStream.filter { _ != 0 }

Union

Union of two or more data streams creating a new stream containing all the elements from all the streams.

dataStream.union(otherStream1, otherStream2, ...)

Connect

“Connects” two data streams retaining their types, allowing for shared state between the two streams.

val stream1 = env.socketTextStream("centos",9999)
val stream2 = env.socketTextStream("centos",8888)
stream1.connect(stream2).flatMap(line=>line.split("\\s+"),line=>line.split("\\s+"))
.map(Word(_,1))
.keyBy("word")
.sum("count")
.print()

Split

Split the stream into two or more streams according to some criterion.

val split = someDataStream.split((num: Int) =>(num % 2) match {case 0 => List("even")case 1 => List("odd")}
)               

Select

Select one or more streams from a split stream.

val even = split select "even"
val odd = split select "odd"
val all = split.select("even","odd")
val lines = env.socketTextStream("centos",9999)
val splitStream: SplitStream[String] = lines.split(line => {if (line.contains("error")) {List("error") //分支名称} else {List("info") //分支名称}
})
splitStream.select("error").print("error")
splitStream.select("info").print("info")

Side Out

val lines = env.socketTextStream("centos",9999)
//设置边输出标签 
val outTag = new OutputTag[String]("error") 
val results = lines.process(new ProcessFunction[String, String] {override def processElement(value: String, ctx: ProcessFunction[String,                 String]#Context, out: Collector[String]): Unit = {if (value.contains("error")) {ctx.output(outTag, value)} else {out.collect(value)}}
})
results.print("正常结果")
//获取边输出
results.getSideOutput(outTag)
.print("错误结果")

KeyBy

Logically partitions a stream into disjoint partitions, each partition containing elements of the same key. Internally, this is implemented with hash partitioning.

dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple

Reduce

A “rolling” reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value.

env.socketTextStream("centos",9999)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.reduce((t1,t2)=>(t1._1,t1._2+t2._2))
.print()

Fold

A “rolling” fold on a keyed data stream with an initial value. Combines the current element with the last folded value and emits the new value.

env.socketTextStream("centos",9999)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.fold(("",0))((t1,t2)=>(t2._1,t1._2+t2._2))
.print()

Aggregations

Rolling aggregations on a keyed data stream. The difference between min and minBy is that min returns the minimum value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).

zs 001 1200
ww 001 1500
zl 001 1000
env.socketTextStream("centos",9999)
.map(_.split("\\s+"))
.map(ts=>(ts(0),ts(1),ts(2).toDouble))
.keyBy(1)
.minBy(2)//输出含有最小值的记录
.print()
1> (zs,001,1200.0)
1> (zs,001,1200.0)
1> (zl,001,1000.0)
env.socketTextStream("centos",9999)
.map(_.split("\\s+"))
.map(ts=>(ts(0),ts(1),ts(2).toDouble))
.keyBy(1)
.min(2)
.print()
1> (zs,001,1200.0)
1> (zs,001,1200.0)
1> (zs,001,1000.0)
http://www.lryc.cn/news/108798.html

相关文章:

  • 【如何使用cv::erode()函数对图像进行腐蚀操作】
  • C++数据结构之BST(二叉搜索树)的实现
  • QT以管理员身份运行
  • java中的缓冲流
  • 【小吉带你学Git】idea操作(1)_配置环境并进行基本操作
  • DP-GAN-生成器代码
  • 2020-2023中国高等级自动驾驶产业发展趋势研究
  • JDK19 - synchronized关键字导致的虚拟线程PINNED
  • 用msys2安装verilator并用spinal进行仿真
  • 【ARM64 常见汇编指令学习 13 -- ARM 汇编 ORG 伪指令学习】
  • Vue使用QuillEditor富文本编辑器问题记录
  • spring AOP学习
  • 16.M端事件和JS插件
  • Zebec APP:构建全面、广泛的流支付应用体系
  • Spark 3.1.1 遇到的 from_json regexp_replace组合表达式慢问题的解决
  • Docker 容器常用的命令和操作
  • iTOP-RK3568开发板Windows 安装 RKTool 驱动
  • nginx rtmp http_flv直播推流
  • Day50 算法记录| 动态规划 17(子序列)
  • RabbitMQ:概念和安装,简单模式,工作,发布确认,交换机,死信队列,延迟队列,发布确认高级,其它知识,集群
  • 小研究 - 基于解析树的 Java Web 灰盒模糊测试(二)
  • 对于现有的分布式id发号器的思考 id生成器 雪花算法 uuid
  • jmeter中json提取器,获取多个值,并通过beanshell组成数组
  • 通过nvm工具快捷切换node.js版本、以及nvm的安装
  • 企业如何搭建矩阵内容,才能真正实现目的?
  • Arduino驱动MQ5模拟煤气气体传感器(气体传感器篇)
  • Mongodb安装(Centos7)
  • Python 批量处理JSON文件,替换某个值
  • 凯迪正大—SF6泄漏报警装置的主要特点
  • 适配器模式与装饰器模式对比分析:优雅解决软件设计中的复杂性