当前位置: 首页 > news >正文

spark DStream从不同数据源采集数据(RDD 队列、文件、diy 采集器、kafka)(scala 编程)

目录

1. RDD队列

2 textFileStream

3 DIY采集器

4 kafka数据源【重点】


1. RDD队列

       a、使用场景:测试
       b、实现方式: 通过ssc.queueStream(queueOfRDDs)创建DStream,每一个推送这个队列的RDD,都会作为一个DStream处理

    val  sparkconf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("stream")val ssc = new StreamingContext(sparkconf,Seconds(3))// 创建一个队列对象,队列中存放的是RDDval queue = new mutable.Queue[RDD[String]]()// 通过队列创建DStreamval queueDS: InputDStream[String] = ssc.queueStream(queue)queueDS.print()// 启动采集器ssc.start()//这个操作之所以放在这个位置,是为了模拟流式的感觉,数据源源不断的生产for(i <- 1 to 5 ){// 循环创建rddval rdd: RDD[String] = ssc.sparkContext.makeRDD(List(i.toString))// 将RDD存放到队列中queue.enqueue(rdd)// 当前线程休眠1秒Thread.sleep(6000)         }// 等待采集器的结束ssc.awaitTermination()}

2 textFileStream

   val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("textFileStream")val ssc = new StreamingContext(sparkConf,Seconds(3))//从文件中读取数据val textDS: DStream[String] = ssc.textFileStream("in")textDS.print()// 启动采集器ssc.start()// 等待采集器的结束ssc.awaitTermination()

3 DIY采集器

    1. 自定义采集器
    2. 什么情况下需要自定采集器呢?
         比如从mysql、hbase中读取数据。
         采集器的作用是从指定的地方,按照采集周期对数据进行采集。
         目前有:采集kafka、采集netcat工具的指定端口的数据、采集文件目录中的数据等
    3. 自定义采集器的步骤,模仿socketTextStream
         a、自定采集器类,继承extends,并指定数据泛型,同时对父类的属性赋值,指定数据存储的级别
         b、重写onStart和onStop方法
            onStart:采集器的如何启动
            onStop:采集的如何停止

val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("DIY")val ssc = new StreamingContext(sparkConf, Seconds(3))// 获取采集的流val ds: ReceiverInputDStream[String] = ssc.receiverStream(new MyReciver("localhost",9999))ds.print()ssc.start()ssc.awaitTermination()}// 继承extends Reciver,并指定数据泛型,同时对父类的属性赋值,指定数据存储的级别class MyReciver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {private var socket: Socket = _def receive = {// 获取输入流val reader = new BufferedReader(new InputStreamReader(socket.getInputStream,"UTF-8"))// 设定一个间接变量var s: String = nullwhile (true) {// 按行读取数据s = reader.readLine()if (s != null) {// 将数据进行封装store(s)}}}// 1. 启动采集器override def onStart(): Unit = {socket = new Socket(host, port)new Thread("Socket Receiver") {setDaemon(true)override def run() {receive}}.start()}// 2. 停止采集器override def onStop(): Unit = {socket.close()socket = null}}

4 kafka数据源【重点】

-- DirectAPI:是由计算的Executor来主动消费Kafka的数据,速度由自身控制。
-- 配置信息基本上是固定写法

 // TODO Spark环境// SparkStreaming使用核数最少是2个val sparkConf = new SparkConf().setMaster("local[*]").setAppName("streaming")val ssc = new StreamingContext(sparkConf, Seconds(3))// TODO 使用SparkStreaming读取Kafka的数据// Kafka的配置信息val kafkaPara: Map[String, Object] = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop105:9092,hadoop106:9092,hadoop107:9092",ConsumerConfig.GROUP_ID_CONFIG -> "atguigu","key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer","value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer")val kafkaDStream: InputDStream[ConsumerRecord[String, String]] =KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Set("atguigu"), kafkaPara))// 获取数据,key是null,value是真实的数据val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())valueDStream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()ssc.start()// 等待采集器的结束ssc.awaitTermination()

 

http://www.lryc.cn/news/198757.html

相关文章:

  • 【三:Mock服务的使用】
  • 驱动:驱动相关概念,内核模块编程,内核消息打印printk函数的使用
  • 【Qt控件之QListWidget】介绍及使用,利用QListWidget、QToolButton、和布局控件实现抽屉式组合控件
  • 【Java基础面试二十四】、String类有哪些方法?
  • [DRAFT] LLVM ThinLTO原理分析
  • 使用Gitlab构建简单流水线CI/CD
  • 【AIGC核心技术剖析】用于高效 3D 内容创建生成(从单视图图像生成高质量的纹理网格)
  • nginx平滑升级添加echo模块、localtion配置、rewrite配置
  • 系统架构师备考倒计时19天(每日知识点)
  • 谈谈 Redis 如何来实现分布式锁
  • .NET 6.0 Web API Hangfire
  • 基于java的校园论坛系统,ssm+jsp,Mysql数据库,前台用户+后台管理,完美运行,有一万多字论文
  • Django小白开发指南
  • 保序回归与金融时序数据
  • 基于单片机设计的家用自来水水质监测装置
  • ubuntu20.04运用startup application开机自启动python程序
  • SpringBoot整合Caffeine实现缓存
  • DVWA-弱会话IDS
  • 【C++中cin、cin.get()、cin.getline()、getline() 的区别】
  • SSH连接华为交换机慢
  • Web攻防03_MySQL注入_数据请求
  • JS加密/解密那些必须知道的事儿
  • 搭建伪分布式Hadoop
  • 【C++】特殊类的设计(只在堆、栈创建对象,单例对象)
  • 分类预测 | MATLAB实现基于GRU-AdaBoost门控循环单元结合AdaBoost多输入分类预测
  • 【Spring Boot项目】根据用户的角色控制数据库访问权限
  • EthernetIP 转MODBUS RTU协议网关连接FANUC机器人作为EthernetIP通信从站
  • 如何注册微信小程序
  • 移动App安全检测的必要性,app安全测试报告的编写注意事项
  • DVWA-JavaScript Attacks