当前位置: 首页 > news >正文

Spark Streaming 数据流处理

一、创建Spark Streaming 环境

二、读取数据(监听端口)

三、任务处理

四、启动程序

我这里写的是简单的单词数量统计

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Durations, StreamingContext}object Demo1WordCount {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setMaster("local[2]")conf.setAppName("wc")val sc = new SparkContext(conf)//1、创建spark streaming环境//指定处理的间隔时间val ssc = new StreamingContext(sc, Durations.seconds(5))//2、读取数据//nc -lk 8888//yum install ncval linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("master", 8888)//统计单词的数量val countDS: DStream[(String, Int)] = linesDS.flatMap(_.split(",")).map((_, 1)).reduceByKey(_ + _)countDS.print()//启动spark streaming程序ssc.start()ssc.awaitTermination()}
}

UpdateStateByKey(有状态算子)能统计之前的单词数量,可做实时更新 

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}object Demo2UpdateStateByKey {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setMaster("local[2]")conf.setAppName("wc")val sc = new SparkContext(conf)//1、创建spark streaming环境//指定处理的间隔时间val ssc = new StreamingContext(sc, Durations.seconds(5))//设置checkpoint路径//用于保存状态ssc.checkpoint("data/checkpoint")//2、读取数据//nc -lk 8888//yum install ncval linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("master", 8888)//统计单词的数量val kvDS: DStream[(String, Int)] = linesDS.flatMap(_.split(",")).map((_, 1))//updateStateByKey(有状态算子): 每一次计算更新每一个key的状态(单词的数量)val countDS: DStream[(String, Int)] = kvDS.updateStateByKey {/*** seq: 当前批次一个key所有value* state: 之前的结果(状态:之前的单词的数量)*/case (seq: Seq[Int], state: Option[Int]) =>println(seq)println(state)//计算当前批次单词的数量val sum: Int = seq.sum//获取之前单词的数量val count: Int = state match {case Some(count) => countcase None => 0}//计算新的单词的数量并返回Option(sum + count)}countDS.print()//启动spark streaming程序ssc.start()ssc.awaitTermination()}
}

http://www.lryc.cn/news/466747.html

相关文章:

  • 高效规划神器 markmap:一键将 Markdown 变思维导图!
  • 微服务基础架构(图)
  • 中电金信:大模型时代 金融机构企业架构转型如何更智能化?
  • 基于CRNN模型的多位数字序列识别的应用【代码+数据集+python环境+GUI系统】
  • windows中命令行批处理脚本学习
  • 版本工具报错:Error Unity Version Control
  • ECharts饼图-饼图标签对齐,附视频讲解与代码下载
  • Python实现基于WebSocket的stomp协议调试助手工具分享
  • 《语音识别方案选型研究》
  • 解决关于HTML+JS + Servlet 实现前后端请求Session不一致的问题
  • ECharts饼图-饼图34,附视频讲解与代码下载
  • 如何实现安川MP3300运动控制器与西门子1200系列PLC进行ModbusTCP通讯
  • react18中如何实现同步的setState来实现所见即所得的效果
  • 深入理解MVP架构模式
  • Java面试题七
  • linux网络编程3——http服务器的实现和性能测试
  • Docker部署Kamailio,并使用LinPhone实现网络通话
  • JAVA-石头迷阵小游戏
  • 鸿蒙--进度条通知
  • 搜维尔科技:varjo xr-4开箱测评,工业用途头显,一流视觉保真度
  • mysql数据量分库分表
  • Vite创建Vue3项目以及Vue3相关基础知识
  • Elasticsearch封装公共索引增删改查
  • Python异常检测:Isolation Forest与局部异常因子(LOF)详解
  • Git的原理和使用(二)
  • docker 发布镜像
  • 投了15亿美元,芯片创新公司Ampere为何成了Oracle真爱?
  • vue 报告标题时间来自 elementUI的 el-date-picker 有开始时间和结束时间
  • 简单几何问题的通解
  • DBeaver导出数据表结构和数据,导入到另一个环境数据库进行数据更新