当前位置: 首页 > news >正文

【Spark征服之路-4.5-Spark-Streaming核心编程(三)】

DStream转换

DStream 上的操作与 RDD 的类似,分为 Transformations(转换)和 Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种 Window 相关的原语。

无状态转化操作

无状态转化操作就是把简单的 RDD 转化操作应用到每个批次上,也就是转化 DStream 中的每一个 RDD。部分无状态转化操作列在了下表中。

注意,针对键值对的 DStream 转化操作(比如reduceByKey())要添加

import StreamingContext._才能在 Scala 中使用。

需要记住的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个 DStream 在内部是由许多 RDD(批次)组成,且无状态转化操作是分别应用到每个 RDD 上的。

例如:reduceByKey()会归约每个时间区间中的数据,但不会归约不同区间之间的数据。

Transform

Transform 允许 DStream 上执行任意的 RDD-to-RDD 函数。即使这些函数并没有在 DStream的 API 中暴露出来,通过该函数可以方便的扩展 Spark API。该函数每一批次调度一次。其实也就是对 DStream 中的 RDD 应用转换。

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("transform")
val ssc = new StreamingContext(sparkConf,Seconds(3))

val lineDStream :ReceiverInputDStream[String] = ssc.socketTextStream("node01",9999)
val wordAndCountDStream :DStream[(String,Int)] = lineDStream.transform(rdd => {
val words :RDD[String] = rdd.flatMap(_.split(" "))
val wordAndOne :RDD[(String,Int)] = words.map((_,1))
val value :RDD[(String,Int)] = wordAndOne.reduceByKey(_+_)
  value
})
wordAndCountDStream.print()

ssc.start()
ssc.awaitTermination()

join

两个流之间的 join 需要两个流的批次大小一致,这样才能做到同时触发计算。计算过程就是对当前批次的两个流中各自的 RDD 进行 join,与两个 RDD 的 join 效果相同。

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("join")
val ssc = new StreamingContext(sparkConf,Seconds(3))

val lineDStream1 :ReceiverInputDStream[String] = ssc.
  socketTextStream("node01",9999)
val lineDStream2 :ReceiverInputDStream[String] = ssc.
  socketTextStream("node02",8888)

val wordToOneDStream :DStream[(String,Int)] = lineDStream1
  .flatMap(_.split(" ")).map((_,1))

val wordToADstream :DStream[(String,String)] = lineDStream2
  .flatMap(_.split(" ")).map((_,"a"))

val joinDStream :DStream[(String,(Int,String))]=wordToOneDStream
  .join(wordToADstream)

joinDStream.print()

ssc.start()
ssc.awaitTermination()

http://www.lryc.cn/news/609516.html

相关文章:

  • [Oracle] TO_CHAR()函数
  • 安装MySQL教程时可能遇到的问题
  • 【Linux】重生之从零开始学习运维之GTID复制
  • XXE漏洞原理及利用
  • NSS-DAY17 2025SWPU-NSSCTF
  • Chrontel 【CH7103B-B】CH7103B HDMI to YPbPr Converter
  • 行业报告:.games域名正引领游戏娱乐产业营销新风向
  • 力扣 hot100 Day65
  • 嵌入式学习之51单片机——串口(UART)
  • 回归预测 | MATLAB实现BP神经网络多输入单输出回归预测+SHAP可解释分析
  • 分布式光伏气象站:为分散电站装上 “智慧之眼”
  • 零基础掌握 Scrapy 和 Scrapy-Redis:爬虫分布式部署深度解析
  • 分布式版本控制工具Git
  • Spring之【Bean的实例化方式】
  • 电脑忘记开机密码怎么办?【图文详解】5种方法重置/更改/取消/设置开机密码?
  • Java从入门到精通 - 算法、正则、异常
  • 深入浅出 RabbitMQ:简单队列实战指南
  • 【Linux指南】软件安装全解析:从源码到包管理器的进阶之路
  • 小杰数据结构(five day)——知人者智,自知者明。
  • WPF 按钮背景色渐变
  • 飞算 JavaAI:给需求分析装上 “智能大脑“
  • VPS云服务器Linux性能分析与瓶颈解决方案设计
  • 机器学习 决策树案例电信用户流失
  • 豆包新模型+PromptPilot深度评测:提示词工程的智能化突破
  • Chrontel 【CH7104B-BF】CH7104B HDMI to HDTV/VGA Converter
  • SJW-app-1
  • 力扣热题100——双指针
  • Android GPU测试
  • 豹女篇章-人形态技能加攻速
  • 数据离不开哈希