当前位置: 首页 > news >正文

Spark Streaming的核心功能及其示例PySpark代码

Spark Streaming是Apache Spark中用于实时流数据处理的模块。以下是一些常见功能的实用PySpark代码示例:

  1. 基础流处理:从TCP套接字读取数据并统计单词数量
from pyspark import `SparkContext
from pyspark.streaming import StreamingContext# 创建SparkContext和StreamingContext
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)  # 1秒的批处理间隔# 创建一个DStream,从TCP源读取数据
lines = ssc.socketTextStream("localhost", 9999)# 对每一行数据进行分词,映射为(word, 1)的键值对,然后按单词统计数量
words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)# 打印每个RDD中的前10个元素
word_counts.pprint()# 启动流计算
ssc.start()
# 等待流计算结束
ssc.awaitTermination()

在上述代码中:

  • sc 是 SparkContext ,用于与Spark集群交互。
  • ssc 是 StreamingContext ,定义了批处理间隔。
  • lines 是一个 DStream ,从指定的TCP套接字读取数据。
  • words 对每行数据进行分词, word_counts 统计每个单词出现的次数。
  • pprint 方法打印每个批次的前10个元素。
  1. 使用窗口函数
from pyspark import SparkContext
from pyspark.streaming import StreamingContextsc = SparkContext("local[2]", "WindowedWordCount")
ssc = StreamingContext(sc, 1)lines = ssc.socketTextStream("localhost", 9999)
words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1))# 使用窗口函数,窗口大小为3秒,滑动间隔为1秒
windowed_word_counts = word_counts.reduceByKeyAndWindow(lambda a, b: a + b, lambda a, b: a - b, 3, 1)windowed_word_counts.pprint()ssc.start()
ssc.awaitTermination()

在这个示例中:

  • reduceByKeyAndWindow 方法用于在窗口上进行聚合操作。
  • 第一个参数是用于合并窗口内元素的函数,第二个参数是用于移除窗口外元素的函数。
  1. 状态更新
from pyspark import SparkContext
from pyspark.streaming import StreamingContextsc = SparkContext("local[2]", "StatefulWordCount")
ssc = StreamingContext(sc, 1)
ssc.checkpoint("checkpoint")  # 启用检查点def updateFunction(new_values, running_count):if running_count is None:running_count = 0return sum(new_values, running_count)lines = ssc.socketTextStream("localhost", 9999)
words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1))# 使用updateStateByKey进行状态更新
stateful_word_counts = word_counts.updateStateByKey(updateFunction)stateful_word_counts.pprint()ssc.start()
ssc.awaitTermination()

在上述代码中:

  • updateStateByKey 方法用于维护每个键的状态。
  • updateFunction 定义了如何根据新值和现有状态更新状态。
  1. 与Kafka集成
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtilssc = SparkContext("local[2]", "KafkaWordCount")
ssc = StreamingContext(sc, 1)# Kafka参数
kafkaParams = {"metadata.broker.list": "localhost:9092"}
topics = ["test"]# 创建Kafka输入DStream
kvs = KafkaUtils.createDirectStream(ssc, topics, kafkaParams)
lines = kvs.map(lambda x: x[1])words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)word_counts.pprint()ssc.start()
ssc.awaitTermination()

在这个示例中:

  • KafkaUtils.createDirectStream 用于从Kafka主题读取数据。
  • kvs 是一个包含Kafka消息的DStream, lines 提取消息内容。
http://www.lryc.cn/news/523027.html

相关文章:

  • 自动驾驶占用网格预测
  • 力扣动态规划-2【算法学习day.96】
  • 软考高级5个资格、中级常考4个资格简介及难易程度排序
  • 2.5 如何评估表示学习
  • Linux-day08
  • stack_queue的底层,模拟实现,deque和priority_queue详解
  • LabVIEW 实现线路板 PCB 可靠性测试
  • sqlfather笔记
  • RabbitMQ(四)
  • 【Unity3D】远处的物体会闪烁问题(深度冲突) Reversed-Z
  • 探索与创作:2024年CSDN平台上的成长与突破
  • QT笔记- Qt6.8.1 Android编程 添加AndroidManifest.xml文件以支持修改权限
  • 【Leetcode 每日一题 - 扩展】421. 数组中两个数的最大异或值
  • 计算机网络 | IP地址、子网掩码、网络地址、主机地址计算方式详解
  • C#如何调用执行命令行窗口(CMD)
  • vim练级攻略(精简版)
  • 一文速通Java的JDBC编程
  • laravel中请求失败重试的扩展--Guzzle
  • 如何在vue中渲染markdown内容?
  • Mysql MVCC
  • Spring6.0新特性-HTTP接口:使用@HttpExchange实现更优雅的Http客户端
  • springboot医院信管系统
  • 迅为RK3568开发板篇OpenHarmony实操HDF驱动控制LED-编写内核 LED HDF 驱动程序
  • [javaWeb]初识Web
  • 复健第二天之[MoeCTF 2022]baby_file
  • uniapp 微信小程序 editor 富文本编辑器
  • SparkSQL函数
  • 从零开始学数据库 day2 DML
  • 电脑换固态硬盘
  • 【大数据】机器学习------支持向量机(SVM)