当前位置: 首页 > news >正文

Spark-Streaming集成Kafka

 Spark Streaming集成Kafka是生产上最多的方式,其中集成Kafka 0.10是较为简单的,即:Kafka分区和Spark分区之间是1:1的对应关系,以及对偏移量和元数据的访问。与高版本的Kafka Consumer API 集成时做了一些调整,下面我们一起来看看吧。

一、创建一个Direct Stream

导入相关maven依赖

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.5.3</version>
</dependency>

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribeclass KafkaDriectStream {def main(args: Array[String]): Unit = {// 创建一个具有2个线程和1秒批处理间隔的本地StreamingContext。val conf = new SparkConf().setMaster("local[2]").setAppName("KafkaDriectStream")val ssc = new StreamingContext(conf, Seconds(1))val kafkaParams = Map[String, Object]("bootstrap.servers" -> "cdh1:9092,cdh2:9092,cdh3:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "use_a_separate_group_id_for_each_stream","auto.offset.reset" -> "latest","enable.auto.commit" -> (false: java.lang.Boolean))val topics = Array("topicA", "topicB")val inputDStream :InputDStream[ConsumerRecord[String, String]]= KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParams))inputDStream.map(record => (record.key, record.value))}
}

如果Spark批处理持续时间大于默认的Kafka心跳会话超时时间(30秒),请适当增加heartbeat.interval.ms和session.timeout.ms。对于大于5分钟的批处理,这将需要更改代理上的group.max.session.timeout.ms。

二、executor选择适合分区处理

新的Kafka Consumer API会将消息预取到缓冲区中。因此,出于性能原因,Spark集成Kafka时最好将缓存的Consumer 保留在executor上(而不是为每个批次重新创建它们)。

在大多数情况下,应该使用LocationStrategies.PreferConsistent。这将在可用的executor之间均匀地分配分区。如果executor与Kafka 的broker位于相同的主机上,则使用PreferBrokers,这将在该分区的Kafka leader上安排分区。最后,如果分区之间的负载严重偏差,请使用PreferFixed。这允许指定分区到主机的显式映射(任何未指定的分区都将使用一致的位置)。

Consumer 缓存的默认最大大小为64。如果处理超过(64个executor数量)的Kafka分区,可以通过更改spark.streaming.kafka.consumer.cache.maxCapacity设置。

如果想禁用Consumer 的缓存,可以将spark.streaming.kafka.consumer.cache.enabled 设置成false

缓存由topic分区和group.id控制,因此对createDirectStream的每次调用使用单独的 group.id

三、根据topic、partition、offset创建RDD

// 导入依赖关系并创建kafka-params,例如第一步:创建Direct Streamval offsetRanges = Array(// topic, partition, 包含起始offset, 不包含结束offsetOffsetRange("test", 0, 0, 100),OffsetRange("test", 1, 0, 100)
)//根据kafka TopicPartition 中的一段数据来创建一个RDD,这是不是为了实现微批来提供支持呢
val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent)

请注意,这里不能指定broker来消费,因为spark streaming的Driver Consumer 可以自动查找broker的元数据。如果要指定broker,需要将其与元数据绑定到一起。

四、获取offset

stream.foreachRDD { rdd =>val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRangesrdd.foreachPartition { iter =>val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")}
}

请注意,HasOffsetRanges的类型转换只有在createDirectStream结果调用的第一个方法中完成时才会成功,而不是在后面的方法链中完成。因为一旦发生shuffle和重分区,RDD分区和Kafka分区之间的一对一关系就会遭到破坏。

五、存储offset

在kafka中为了实现精确一次的语义,必须把结果处理和offset放到一个事务中去处理,在与spark streaming集成时也不例外。必须在幂等输出之后存储offset,或者将offset与输出一起存储在原子事务中。

offset可以存储在spark的checkpoint中,也可以存储在kafka自身的内部topic中。将offset存储到kafka的好处是,无论应用程序代码发生什么变化,Kafka都是一个持久的存储。但是,Kafka不是事务性的,程序的输出必须仍然是幂等的。注意,在流式计算中我们一般会将enable.auto.commit置为false。采用手动提交的方式。

stream.foreachRDD { rdd =>val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges// 一段时间后,在输出完成之后,提交offsetstream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

与HasOffsetRanges一样,只有在createDirectStream的结果上调用时,才能成功得到CanCommitOffsets ,而不是在转换之后。获取到CanCommitOffsets 一般要等这批数据处理完再进行提交。

// 从提交到数据库的偏移量开始
val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset")
}.toMapval stream = KafkaUtils.createDirectStream[String, String](streamingContext,PreferConsistent,Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
)stream.foreachRDD { rdd =>val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRangesval results = yourCalculation(rdd)// 开启事务// 更新结果// 更新offset// 结束事务
}

六、官方例子

object DirectKafkaWordCount {def main(args: Array[String]): Unit = {if (args.length < 3) {System.err.println(s"""|Usage: DirectKafkaWordCount <brokers> <groupId> <topics>|  <brokers> is a list of one or more Kafka brokers|  <groupId> is a consumer group name to consume from topics|  <topics> is a list of one or more kafka topics to consume from|""".stripMargin)System.exit(1)}StreamingExamples.setStreamingLogLevels()val Array(brokers, groupId, topics) = args// 以2秒的批处理间隔创建上下文val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")val ssc = new StreamingContext(sparkConf, Seconds(2))//指定kafka、topic信息创建direct kafka streamval topicsSet = topics.split(",").toSetval kafkaParams = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,ConsumerConfig.GROUP_ID_CONFIG -> groupId,ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])val messages = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))// 获取一行数据并进行分割、统计、打印val lines = messages.map(_.value)val words = lines.flatMap(_.split(" "))val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)wordCounts.print()//启动计算ssc.start()ssc.awaitTermination()}
}

该例子消费Kafka中一个或多个topic的消息并进行单词统计,需要三个参数:1、Kafka broker的列表,2、消费者组,3、以逗号分隔的topic列表

1、创建2个topic

kafka-topics --create --topic spark-streaming-wc1 --bootstrap-server cdh1:9092 --partitions 2 --replication-factor 2
kafka-topics --create --topic spark-streaming-wc2 --bootstrap-server cdh1:9092 --partitions 2 --replication-factor 2

2、启动程序

cd /opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/
bin/run-example org.apache.spark.examples.streaming.DirectKafkaWordCount cdh1:9092,cdh2:9092 direct-kafka-wc-group spark-streaming-wc1,spark-streaming-wc2

3、向topic推送数据

kafka-console-producer --topic spark-streaming-wc1 --broker-list cdh1:9092,cdh2:9092,cdh3:9092
kafka-console-producer --topic spark-streaming-wc2 --broker-list cdh1:9092,cdh2:9092,cdh3:9092

4、查看结果


大多数高校硕博生毕业要求需要参加学术会议,发表EI或者SCI检索的学术论文会议论文:
可访问艾思科蓝官网,浏览即将召开的学术会议列表。会议如下:

第四届大数据、信息与计算机网络国际学术会议(BDICN 2025)

  • 广州
  • https://ais.cn/u/fi2yym

第四届电子信息工程、大数据与计算机技术国际学术会议(EIBDCT 2025)

  • 青岛
  • https://ais.cn/u/nuQr6f

第六届大数据与信息化教育国际学术会议(ICBDIE 2025)

  • 苏州
  • https://ais.cn/u/eYnmQr

第三届通信网络与机器学习国际学术会议(CNML 2025)

  • 南京
  • https://ais.cn/u/vUNva2
http://www.lryc.cn/news/508306.html

相关文章:

  • 移植 OLLVM 到 Android NDK,Android Studio 中使用 OLLVM
  • DAY36|动态规划Part04|LeetCode:1049. 最后一块石头的重量 II、494. 目标和、474.一和零
  • Linux 下SVN新手操作手册
  • 障碍感知 | 基于KD树的障碍物快速处理(附案例分析与ROS C++仿真)
  • Electron -- Electron Fiddle(一)
  • 详解Redis的常用命令
  • elasticache备份
  • Tomcat负载均衡全解析
  • [LeetCode-Python版] 定长滑动窗口8——2461. 长度为 K 子数组中的最大和
  • springboot476基于vue篮球联盟管理系统(论文+源码)_kaic
  • 预约参观华为基地,见证行业巅峰
  • 【Flink-scala】DataSet编程模型介绍及数据源
  • Odrive源码分析(四) 位置爬坡算法
  • [Unity Shader][图形渲染] Shader数学基础11 - 复合变换详解
  • 使用Python实现智能家居控制系统:开启智慧生活的钥匙
  • 使用 HTML5 Canvas 实现动态蜈蚣动画
  • 计算机视觉目标检测——DETR(End-to-End Object Detection with Transformers)
  • uniapp .gitignore
  • JavaWeb Servlet的反射优化、Dispatcher优化、视图(重定向)优化、方法参数值获取优化
  • 备忘一个FDBatchMove数据转存的问题
  • CEF127 编译指南 MacOS 篇 - 编译 CEF(六)
  • 【更新】LLM Interview
  • Django 视图中使用 Redis 缓存优化查询性能
  • 正则表达式解析与功能说明
  • STUN服务器实现NAT穿透
  • 音视频入门基础:MPEG2-TS专题(19)——FFmpeg源码中,解析TS流中的PES流的实现
  • tomcat的安装以及配置(基于linuxOS)
  • 因子分解(递归)
  • 【Python】pandas库---数据分析
  • RabbitMQ 的7种工作模式