当前位置: 首页 > news >正文

13-pyspark的共享变量用法总结

目录

    • 前言
    • 广播变量
      • 广播变量的作用
      • 广播变量的使用方式
    • 累加器
    • 累加器的作用
      • 累加器的优缺点
      • 累加器的使用方式


PySpark实战笔记系列第四篇

  • 10-用PySpark建立第一个Spark RDD(PySpark实战笔记系列第一篇)
  • 11-pyspark的RDD的变换与动作算子总结(PySpark实战笔记系列第二篇))
  • 12-pyspark的RDD算子注意事项总结(PySpark实战笔记系列第三篇)
  • 13-pyspark的共享变量用法总结(PySpark实战笔记系列第四篇)

前言

spark提供两种特定的共享方式:广播变量累加器

广播变量

广播变量允许程序缓存一个只读变量在集群的每个机器上。广播变量就是普通变量的一个包装变量。

广播变量的作用

可以用一种更高效的方式来共享一些数据,比如一个全局配置文件,可以通过广播变量共享给所有节点。

广播变量的使用方式

创建:通过调用SparkContext.broadcast()方法来将一个普通变量创建为一个广播变量。

访问:通过value方法来访问。

更新:通过unpersist()方法声明更新,然后修改原始变量的值,通过再次广播从而被其他节点获取。

销毁:通过destroy()方法可以把广播变量的数据和元数据一起销毁掉,销毁后不能再使用。

# 示例
import findspark
findspark.init()
##############################################
from pyspark.sql impot SparkSession
spark = SparkSession.builder \.master("local[2]") \.appName("broadcastDemo") \.getOrCreate();
sc = spark.SparkContext
##############################################
ip_mes = {"ip":"127.0.0.2","key":"password"}
# 创建广播变量
brVar = sc.broadcast(ip_mes)# 获取广播变量的值
val = brVar.value
# {"ip":"127.0.0.2","key":"password"}
print(val)
# password
print(val["key"])
# 更新广播变量
brVar.unpersist()
ip_mes["key"] = "admin"
brVar = sc.broadcast(ip_mes) #再次广播
# 获取广播后的变量值
val = brVar.value
# {"ip":"127.0.0.2","key":"admin"}
print(val)
# 销毁广播变量
brVar.destroy()
##############################################
sc.stop()

累加器

除了广播变量进行变数共享外,Spark还提供了一种累加器用于在集群中共享数据。。Spark原生支持数值类型的累加器,开发人员可以根据自己的需求来支持其他数据类型。

累加器的作用

一个常见的作用是:在调试时对作业的执行过程中的相关事件进行计数

累加器的优缺点

优点:能够快速执行操作。

缺点:只能利用关联操作做“加”操作的变量。

累加器的使用方式

创建:通过SparkContext.accumulator()方法来创建出累加器对象。

访问:通过value方法来访问。

更新:不同节点上的计算任务都可以利用add方法或者使用**+=操作**来给累加器加值。

注意事项

  • 累加器是一种只可加的变量对象,比如不能执行-=操作
  • 使用累加器时,为了保证准确性,只能使用一次动作操作。如果需要使用多次动作操作,则在RDD对象上执行cache或persist操作来切断依赖。
# 示例
import findspark
findspark.init()
##############################################
from pyspark.sql impot SparkSession
spark = SparkSession.builder \.master("local[2]") \.appName("broadcastDemo") \.getOrCreate();
sc = spark.SparkContext
##############################################
rdd = sc.range(1,101)
# 创建累加器,初始值0
acc = sc.accumulator(0)
def countEnve(x):global accif x%2 == 0:acc +=1 # 累加器更新
rdd_count = rdd.map(countEnve)
# 获取累加器值
# 0 因为未执行动作操作,即countEnve函数的逻辑还未执行
print(acc.value)
"""
保证多次正确获取累加器值,否则当我们再次执行rdd_count.count(),
累加器会再次执行。
rdd_counter.persist()切断了动作操作的链条,因此只会执行一次。
"""
rdd_count.persist()
# 100
print(rdd_count.count())
# 50
print(acc.value)# 100
print(rdd_count.count())
# 50
print(acc.value)
##############################################
sc.stop()

ps:上述示例代码,待实际反复运行!确认其运行过程。


参考文档:

  • https://spark.apache.org/docs/latest/api/python/reference/pyspark.html
  • 《Python大数据处理库PySpark实战》

博主写博文就是方便对自己所学所做的事做一备份记录或回顾总结。欢迎留言,沟通学习。

刚开始接触,请多指教,欢迎留言交流!

http://www.lryc.cn/news/337565.html

相关文章:

  • BI数据分析软件:行业趋势与功能特点剖析
  • centos7上docker搭建vulhub靶场
  • Flutter入门指南
  • keepalived脑裂问题
  • 【Linux笔记】编mysql库
  • vscode远程免密登录ssh
  • 2024年MathorCup数模竞赛C题详解
  • 【简单讲解如何安装与配置Composer】
  • 深入理解Apache ZooKeeper与Kafka的协同工作原理
  • js解密心得,记录一次抓包vue解密过程
  • redis-哨兵模式
  • 自动化测试中的SOLID原则
  • tencentcloud-sdk-python-iotexplorer和tencent-iot-device有什么区别
  • Spring day1
  • 设计模式: 行为型之中介者模式(18)
  • 计算机网络的起源与发展历程
  • 2024-4-12-实战:商城首页(下)
  • 一、flask入门和视图
  • Selenium+Chrome Driver 爬取搜狐页面信息
  • SpringBoot:一个注解就能帮你下载任意对象
  • oracle全量、增量备份
  • React Router 5 vs 6:使用上的主要差异与升级指南
  • 基于LNMP部署wordpress
  • openGauss_5.1.0 企业版快速安装及数据库连接:单节点容器化安装
  • 微信小程序 uniapp+vue城市公交线路查询系统dtjl3
  • 2024年MathorCup数模竞赛B题问题一二三+部分代码分享
  • Ubuntu日常配置
  • GMSSL-通信
  • linux 磁盘分区Inode使用率达到100%,导致网站无法创建文件报错 failed:No space leftondevice(
  • 探索Python库的奇妙世界