当前位置: 首页 > news >正文

大数据-Spark批处理实用广播Broadcast构建一个全局缓存Cache

1、broadcast广播

在这里插入图片描述

在Spark中,broadcast是一种优化技术,它可以将一个只读变量缓存到每个节点上,以便在执行任务时使用。这样可以避免在每个任务中重复传输数据。

2、构建缓存

import org.apache.spark.sql.SparkSession
import org.apache.spark.broadcast.Broadcast
import com.alibaba.fastjson.JSONObject// 定义全局缓存单例对象
object GlobalCache extends Serializable {// 广播变量,用于存储缓存数据private var cacheData: Broadcast[collection.mutable.Map[String, JSONObject]] = _// 设置 SparkSession 和广播变量def setSparkSession(spark: SparkSession): Unit = {cacheData = spark.sparkContext.broadcast(collection.mutable.Map.empty[String, JSONObject])}// 按订单ID和用户ID缓存JSONObject对象def cacheJSONObject(orderId: String, userId: String, jsonObject: JSONObject): Unit = {// 获取广播变量的值并进行修改val data = cacheData.valuedata.synchronized {data.put(generateKey(orderId, userId), jsonObject)}}// 根据订单ID和用户ID删除缓存的JSONObject对象def removeJSONObject(orderId: String, userId: String): Unit = {// 获取广播变量的值并进行修改val data = cacheData.valuedata.synchronized {data.remove(generateKey(orderId, userId))}}// 根据订单ID和用户ID获取缓存的JSONObject对象def getJSONObjet(orderId: String, userId: String): JSONObject = {// 获取广播变量的值并进行访问val data = cacheData.valuedata.synchronized {data.get(generateKey(orderId, userId)).orNull}}// 生成缓存键,使用订单ID和用户ID拼接private def generateKey(orderId: String, userId: String): String = s"$orderId|$userId"
}

3、缓存测试

import org.apache.spark.sql.SparkSession
import org.apache.spark.broadcast.Broadcast
import com.alibaba.fastjson.JSONObject
import org.apache.log4j.{Level, Logger}object CacheTest {Logger.getLogger("org").setLevel(Level.ERROR)Logger.getRootLogger().setLevel(Level.ERROR) // 设置日志级别def addItem(orderId:String, userId:String, name:String): Unit = {val jsonObject = new JSONObject()jsonObject.put("name", name)// 缓存JSONObject对象GlobalCache.cacheJSONObject(orderId, userId, jsonObject)}def getCache(orderId: String, userId: String): JSONObject = {// 获取缓存的JSONObject对象GlobalCache.getJSONObjet(orderId, userId)}def delItem(orderId:String, userId:String): Unit = {// 删除缓存的JSONObject对象GlobalCache.removeJSONObject(orderId, userId)}def getSparkSession(appName: String, localType: Int): SparkSession = {val builder: SparkSession.Builder = SparkSession.builder().appName(appName)if (localType == 1) {builder.master("local[8]") // 本地模式,启用8个核心}val spark = builder.getOrCreate() // 获取或创建一个新的SparkSessionspark.sparkContext.setLogLevel("ERROR") // Spark设置日志级别spark}def main(args: Array[String]): Unit = {println("Start CacheTest")val spark: SparkSession = getSparkSession("CacheTest", 1)GlobalCache.setSparkSession(spark)  // 构造全局缓存addItem("001", "456", "苹果")      // 添加元素addItem("002", "789", "香蕉")      // 添加元素var cachedObject = getCache("001", "456")println(s"Cached Object: $cachedObject")delItem("001", "456")      // 删除元素cachedObject = getCache("001", "456")println(s"Cached Object: $cachedObject")spark.stop()}
}

4、控制台输出

Start CacheTest
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Cached Object: {"name":"苹果"}
Cached Object: nullProcess finished with exit code 0
http://www.lryc.cn/news/99458.html

相关文章:

  • Android Service的生命周期,两种启动方法,有什么区别
  • 测试开源C#人脸识别模块ViewFaceCore(5:质量检测和眼睛状态检测)
  • Go语言网络库net/http
  • Acwing.91 最短Hamilton路径(动态规划)
  • [hfut] [important] v4l2 vedio使用总结/opevx/ffpeg/v4l2/opencv/cuda
  • 2023年深圳杯数学建模A题影响城市居民身体健康的因素分析
  • 指令调度(Instruction Scheduling)
  • 【运维】Linux 跨服务器复制文件文件夹
  • k8s apiserver如何支持http访问?
  • Safetensors,高效安全易用的深度学习新工具
  • Unity 工具之 NuGetForUnity 包管理器,方便在 Unity 中的进行包管理的简单使用
  • 运算放大器(二):恒流源
  • 企业选择租用CRM还是一次性买断CRM?分别有哪些优势?
  • VBA_MF系列技术资料1-133
  • Android 项目架构
  • 【Linux】进程通信 — 管道
  • ROS 2 — 托管(生命周期)节点简介
  • vue2企业级项目(六)
  • OSPF的选路原则
  • 4.操作元素属性
  • uniapp 微信小程序:v-model双向绑定问题(自定义 props 名无效)
  • 【Lua学习笔记】Lua进阶——Table(3) 元表
  • AI编程常用工具 Jupyter Notebook
  • RocketMQ重复消费的解决方案::分布式锁直击面试!
  • 如何降低TCP在局域网环境下的数据传输延迟
  • 【LeetCode】78.子集
  • 认可功能介绍 - 技术声誉靠认可
  • EtherNet/IP转CAN网关can协议标准
  • 解决代理IP负载均衡与性能优化的双重挑战
  • 深度探索 Elasticsearch 8.X:function_score 参数解读与实战案例分析