当前位置: 首页 > news >正文

spark stream入门案例:netcat准实时处理wordCount(scala 编程)

目录

案例需求

代码

结果

解析


         案例需求:

        使用netcat工具向9999端口不断的发送数据,通过SparkStreaming读取端口数据并统计不同单词出现的次数

        -- 1. Spark从socket中获取数据:一行一行的获取
        -- 2. Driver程序执行时,streaming处理过程不能结束
        -- 3. 采集器在正常情况下启动后就不应该停止,除非特殊情况
        -- 4. 采集器位于一个executor中,是一个线程,执行时需要一个核,如果设定的总核数为1时,那么在运行时因为没有核数,所以不会有打印结果,所以sparkStreaming使用的核数至少为2个
        -- 5. print()方法,默认是打印10行结果
        -- 6. netcat的指令:
 

      在Windows下:nc -lp 9999在linux下: nc -lk 9999

        代码: 
package cn.olo.streamimport org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}object StreamDemo {def main(args: Array[String]): Unit = {// 连接SparkStreamingval sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkStreaming")/*1.方法:StreamingContext(形参)2.形参:形参1:conf: SparkConf:spark配置对象形参2:batchDuration: Duration:采集时间*/val ssc = new StreamingContext(sparkConf,Seconds(5))// 需求:使用netcat工具向9999端口不断的发送数据,通过SparkStreaming读取端口数据并统计不同单词出现的次数// 1. 获取netcat工具9999端口的连接,并开始接收数据// 从socket中获取数据:一行一行的获取val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("localhost",9999)// 2. 数据处理val wordDS: DStream[String] = socketDS.flatMap(_.split(" "))val wordToSumDS: DStream[(String, Int)] = wordDS.map((_,1)).reduceByKey(_ + _ )// 3. 打印数据wordToSumDS.print()// 4. Driver程序执行时,streaming处理过程不能结束// 采集器在正常情况下启动后就不应该停止,除非特殊情况// 启动采集器ssc.start()// 等待采集器的结束ssc.awaitTermination()}}

结果:

解析:

        a、采集周期时间之间,每一个采集周期生成一个RDD,按照时间的顺序依次进行
        b、在每一个采集周期内,会执行wordcount计算,最终得出:统计出每一个采集周期时间的wordcount

http://www.lryc.cn/news/196258.html

相关文章:

  • Ansible基础及模块
  • Atlassian Confluence OGNL表达式注入RCE CVE-2021-26084
  • 【c语言】编译链接--详解
  • 国家开放大学 训练题
  • 【灵动 Mini-G0001开发板】+Keil5开发环境搭建+ST-Link/V2程序下载和仿真+4颗LED100ms闪烁。
  • 同为科技(TOWE)关于风力发电雷电防护的解决方案
  • gorm 中的事务运用
  • maven 新建模块 导入后 按Ctrl 点不进新建模块pom定义
  • idea使用debug无法启动,使用run可以启动
  • 进程的虚拟地址空间
  • 做web自动化测试遇到Chrome浏览器老是自动更新,怎么办 ? 这里提供两个解决办法 。
  • 腾讯HR面试
  • 过滤器(Filter)和拦截器(Interceptor)有什么不同?
  • Spring 注解 @Qualifier 详解
  • 实现更低功耗R5F51406BDNE、R5F51406ADFK、R5F51406ADFL、R5F51406AGFN搭载RXv2内核的32位微控制器
  • 通信系统中ZF,ML,MRC以及MMSE四种信号检测算法误码率matlab对比仿真
  • Redis数据结构之listpack
  • VMware 配置记录
  • 【Java基础面试十四】、 封装的目的是什么,为什么要有封装?
  • 阿里云2023年双十一优惠活动整理
  • HTML标签详解 HTML5+CSS3+移动web 前端开发入门笔记(四)
  • lenovo联想笔记本ThinkPad系列T15p或P15v Gen3(21DA,21DB,21D8,21D9)原厂Win11系统镜像
  • 【SpringBoot】拦截器(Interceptor)的使用
  • CS鱼饵制作
  • 问题记录1 json解析问题
  • std::move以及右值引用等
  • 分享一个比对图片是否一致的小工具(来源: github)
  • 编写AA程序需要做以下几个步骤:
  • jmeter接口测试使用rsa加密解密算法
  • IDEA通过Docker插件部署SpringBoot项目