当前位置: 首页 > news >正文

【Flink-scala】DataStream编程模型之状态编程

DataStream编程模型之状态编程

参考:
1.【Flink-Scala】DataStream编程模型之数据源、数据转换、数据输出
2.【Flink-scala】DataStream编程模型之 窗口的划分-时间概念-窗口计算程序
3.【Flink-scala】DataStream编程模型之窗口计算-触发器-驱逐器
4.【Flink-scala】DataStream编程模型之水位线
5.【Flink-scala】DataStream编程模型之延迟数据处理


文章目录

  • DataStream编程模型之状态编程
  • 前言
  • 一、状态编程相关概念
    • 1.1Flink中状态始终与特定算子相关联
    • 1.2 演示代码
    • 1.3 状态编程程序输入输出


前言

流计算分为无状态和有状态两种,无状态是观察每个独立事件,根据最后一个事件输出结果。比如传感器只关注当前的水位量,超出水位量就发生报警事件。
有状态计算则会基于多个事件输出结果。比如计算过去1小时的水位平均值,那就是状态的计算。

一、状态编程相关概念

流与流之间的所有关联操作,以及流与静态表或动态表之间的关联操作,都是有状态计算。

在传统的批处理中,数据的划分为块分片去完成的,每个task处理一个分片,执行完成后,把结果聚合起来就是最终的结果,这个过程中,对状态的需求还是较少的。

但对于流计算而言,它对状态有着非常高的要求,因为在流系统中,输入是一个无限制的流,会运行很长一段时间,甚至运行几天或者几个月都不会停机。在这个过程当中,就需要把状态数据很好地管理起来

1.1Flink中状态始终与特定算子相关联

分为算子状态和键控状态
在这里插入图片描述
算子状态的作用范围限定为算子任务,这意味着由同一并行任务所处理的所有数据都可以访问到相同的状态,状态对于同一任务而言是共享的。

算子状态不能由相同或不同算子的另一个任务访问

键控状态是根据输入数据流中定义的键来维护和访问的。Flink为每个键值维护一个状态实例,并将具有相同键的所有数据分区到同一个算子任务中,这个任务会维护和处理这个键对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的键。因此,具有相同键的所有数据都会访问相同状态

在这里插入图片描述

1.2 演示代码

import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collectorcase class StockPrice(stockId:String,timeStamp:Long,price:Double)object StateTest {def main(args: Array[String]): Unit = {//设定执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment   
//设定程序并行度env.setParallelism(1) //创建数据源val source = env.socketTextStream("localhost", 9999) //指定针对数据流的转换操作逻辑val stockDataStream = source.map(s => s.split(",")).map(s => StockPrice(s(0).toString, s(1).toLong, s(2).toDouble))val alertStream = stockDataStream.keyBy(_.stockId).flatMap(new PriceChangeAlert(10))//新建了一个PriceChangeAlert类  这里重新了flatmap方法// 打印输出alertStream.print() //触发程序执行env.execute("state test")}class PriceChangeAlert(threshold: Double) extends RichFlatMapFunction[StockPrice,(String, Double, Double)]{//定义状态保存上一次的价格lazy val lastPriceState: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("last-price",classOf[Double]))override def flatMap(value: StockPrice, out: Collector[(String, Double, Double)]): Unit = {// 获取上次的价格
val lastPrice = lastPriceState.value()
//跟最新的价格求差值做比较val diff = (value.price-lastPrice).absif( diff > threshold)out.collect((value.stockId,lastPrice,value.price))//更新状态lastPriceState.update(value.price)}}
}

代码分析:
1.传入参数,阈值
2.继承里接受一个stockPrice类型的输入,一个(String,Double,Double)三元组的输出。

String,Double,Double
case class StockPrice(stockId:String,timeStamp:Long,price:Double)

有什么不同呢,两个double代表了两个价格:分别代表股票ID、上次价格、当前价格。

3.ValueState是Flink中用于保存单个值的状态。这里它被用来保存上一次处理的股票价格。lazy关键字意味着这个状态变量只有在第一次被使用时才会被初始化
4…getState(new ValueStateDescriptor[Double](“last-price”, classOf[Double])): 这个方法尝试从运行时上下文中检索一个名为 “last-price” 的 ValueState,如果状态不存在,它将根据提供的 ValueStateDescriptor 创建一个新的状态。

ValueStateDescriptor 包含了状态的名称(代码中是 “last-price”)和状态的值的类型(这个代码中是 Double)。
5. classOf[Double] 提供了状态的值的类型信息。
6. 重写的flatmap应该能看懂,主要是当当前价格超出阈值(代码中是10),就打印。

1.3 状态编程程序输入输出

输入:

stock_4,1602031562148,43.4
stock_1,1602036130952,39.7
stock_4,1602036131741,59.9
stock_2,1602036132184,30.1
stock_3,1602036133154,79.8
stock_0,1602036133919,9.9
stock_1,1602036134385,21.7

输出:

(stock_4,0.0,43.4)
(stock_1,0.0,39.7)
(stock_4,43.4,59.9)
(stock_2,0.0,30.1)
(stock_3,0.0,79.8)
(stock_1,39.7,21.7)

其中根据stock_id分类。

初始状态:所有stockId的最近价格都是未定义的(即null或None,在代码中表现为Double的默认值0.0,因为ValueState在初始化时未设置值)。

处理第一条记录:stock_4,1602031562148,43.4。由于没有先前的价格,不会触发输出。最近价格更新为43.4。
处理第二条记录:stock_1,1602036130952,39.7。同样,没有先前的价格,不会触发输出。最近价格更新为39.7。
处理第三条记录:stock_4,1602036131741,59.9。价格从43.4变为59.9,差异为16.5,超过阈值10,因此输出(stock_4, 43.4, 59.9)。最近价格更新为59.9。
后续记录:对于stock_2、stock_3、stock_0,由于没有先前的价格,30.1 和79.8直接列出,
但是9.9这个价格要注意
stock_0,默认值为0,这里变为9.9,没有超出阈值10,那么输出就没有。

http://www.lryc.cn/news/505480.html

相关文章:

  • RabbitMQ的核心组件有哪些?
  • 【Linux基础】基本开发工具的使用
  • 常见的数据结构和应用场景
  • 爬虫基础学习
  • C++对象数组对象指针对象指针数组
  • D96【python 接口自动化学习】- pytest进阶之fixture用法
  • 【算法】动态规划中01背包问题解析
  • 选择WordPress和Shopify:搭建对谷歌SEO友好的网站
  • 代理IP与生成式AI:携手共创未来
  • iOS 应用的生命周期
  • Elasticsearch 集群快照的定期备份设置指南
  • Docker--Docker Image(镜像)
  • C++ 中的序列化和反序列化
  • 我的Github学生认证申请过程
  • 信奥题解:勾股数计算中的浮点数精度问题
  • 重生之我在学Vue--第2天 Vue 3 Composition API 与响应式系统
  • 【AI知识】逻辑回归介绍+ 做二分类任务的实例(代码可视化)
  • Mysql 笔记2 emp dept HRs
  • MySQL和Oracle的区别
  • 实验12 C语言连接和操作MySQL数据库
  • 09篇--图片的水印添加(掩膜的运用)
  • sql-labs(21-25)
  • CTF知识集-命令执行
  • 基于米尔全志T527开发板的OpenCV进行手势识别方案
  • Htpp中web通讯发送post(上传文件)、get请求
  • 【论文阅读笔记】HunyuanVideo: A Systematic Framework For Large Video Generative Models
  • SpringBoot的事务钩子函数
  • 源码安装PHP-7.2.19
  • UE5制作伤害浮动数字
  • 学习日志024--opencv中处理轮廓的函数