当前位置: 首页 > news >正文

Spark 3:Spark Core RDD持久化

RDD 的数据是过程数据

b8da17ba7f4f4942b11bb5f211111136.png

RDD 的缓存

a8536b8352164f449bcb9d6c550cfe4a.png

# coding:utf8
import timefrom pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevelif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd1 = sc.textFile("../data/input/words.txt")rdd2 = rdd1.flatMap(lambda x: x.split(" "))rdd3 = rdd2.map(lambda x: (x, 1))# 缓存到内存中rdd3.cache()# 先放内存,如果不够则放硬盘,2份副本rdd3.persist(StorageLevel.MEMORY_AND_DISK_2)rdd4 = rdd3.reduceByKey(lambda a, b: a + b)print(rdd4.collect())rdd5 = rdd3.groupByKey()rdd6 = rdd5.mapValues(lambda x: sum(x))print(rdd6.collect())# 清理缓存rdd3.unpersist()time.sleep(100000)

06100dee265d4fe587b2abfeb2291e1f.png

e68c6e5996a148c2991e884258f5bc4a.png

103ceba93d6c42daa77820cb985257f8.png

RDD 的CheckPoint

fa83627f868d444db9e8f3b5efc06e8d.png

# coding:utf8
import timefrom pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevelif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)# 1. 告知spark, 开启CheckPoint功能sc.setCheckpointDir("hdfs://node1:8020/output/ckp")rdd1 = sc.textFile("../data/input/words.txt")rdd2 = rdd1.flatMap(lambda x: x.split(" "))rdd3 = rdd2.map(lambda x: (x, 1))# 调用checkpoint API 保存数据即可rdd3.checkpoint()rdd4 = rdd3.reduceByKey(lambda a, b: a + b)print(rdd4.collect())rdd5 = rdd3.groupByKey()rdd6 = rdd5.mapValues(lambda x: sum(x))print(rdd6.collect())rdd3.unpersist()time.sleep(100000)

6297822106df4f648b5759fd320c6ca8.png

961fa643346945a1b842bddda7222db3.png

Cache和Checkpoint区别
Cache是轻量化保存RDD数据,可存储在内存和硬盘,是分散存储,设计上数据是不安全的(保留RDD血缘关系)。
CheckPoint是重量级保存RDD数据,是集中存储,只能存储在硬盘(HDFS)上,设计上是安全的(不保留RDD血缘关系)。
Cache 和 CheckPoint的性能对比?
Cache性能更好,因为是分散存储,各个Executor并行执行,效率高,可以保存到内存中(占内存),更快。
CheckPoint比较慢,因为是集中存储,涉及到网络IO,但是存储到HDFS上更加安全(多副本) 。

案例练习:搜索引擎日志分析

0dd1f3dcea24466fa400012cb8541847.png

bcdb47e47d32466aa25847a43dd57614.png

60c99356504f4f02aa731a890bfd9597.png

# coding:utf8# 导入Spark的相关包
import timefrom pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevel
from defs import context_jieba, filter_words, append_words, extract_user_and_word
from operator import addif __name__ == '__main__':# 0. 初始化执行环境 构建SparkContext对象conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)# 1. 读取数据文件file_rdd = sc.textFile("hdfs://node1:8020/input/SogouQ.txt")# 2. 对数据进行切分 \tsplit_rdd = file_rdd.map(lambda x: x.split("\t"))# 3. 因为要做多个需求, split_rdd 作为基础的rdd 会被多次使用.split_rdd.persist(StorageLevel.DISK_ONLY)# TODO: 需求1: 用户搜索的关键`词`分析# 主要分析热点词# 将所有的搜索内容取出# print(split_rdd.takeSample(True, 3))context_rdd = split_rdd.map(lambda x: x[2])# 对搜索的内容进行分词分析words_rdd = context_rdd.flatMap(context_jieba)# print(words_rdd.collect())# 院校 帮 -> 院校帮# 博学 谷 -> 博学谷# 传智播 客 -> 传智播客filtered_rdd = words_rdd.filter(filter_words)# 将关键词转换: 传智播 -> 传智播客final_words_rdd = filtered_rdd.map(append_words)# 对单词进行 分组 聚合 排序 求出前5名result1 = final_words_rdd.reduceByKey(lambda a, b: a + b).\sortBy(lambda x: x[1], ascending=False, numPartitions=1).\take(5)print("需求1结果: ", result1)# TODO: 需求2: 用户和关键词组合分析# 1, 我喜欢传智播客# 1+我  1+喜欢 1+传智播客user_content_rdd = split_rdd.map(lambda x: (x[1], x[2]))# 对用户的搜索内容进行分词, 分词后和用户ID再次组合user_word_with_one_rdd = user_content_rdd.flatMap(extract_user_and_word)# 对内容进行 分组 聚合 排序 求前5result2 = user_word_with_one_rdd.reduceByKey(lambda a, b: a + b).\sortBy(lambda x: x[1], ascending=False, numPartitions=1).\take(5)print("需求2结果: ", result2)# TODO: 需求3: 热门搜索时间段分析# 取出来所有的时间time_rdd = split_rdd.map(lambda x: x[0])# 对时间进行处理, 只保留小时精度即可hour_with_one_rdd = time_rdd.map(lambda x: (x.split(":")[0], 1))# 分组 聚合 排序result3 = hour_with_one_rdd.reduceByKey(add).\sortBy(lambda x: x[1], ascending=False, numPartitions=1).\collect()print("需求3结果: ", result3)time.sleep(100000)import jiebadef context_jieba(data):"""通过jieba分词工具 进行分词操作"""seg = jieba.cut_for_search(data)l = list()for word in seg:l.append(word)return ldef filter_words(data):"""过滤不要的 谷 \ 帮 \ 客"""return data not in ['谷', '帮', '客']def append_words(data):"""修订某些关键词的内容"""if data == '传智播': data = '传智播客'if data == '院校': data = '院校帮'if data == '博学': data = '博学谷'return (data, 1)def extract_user_and_word(data):"""传入数据是 元组 (1, 我喜欢传智播客)"""user_id = data[0]content = data[1]# 对content进行分词words = context_jieba(content)return_list = list()for word in words:# 不要忘记过滤 \谷 \ 帮 \ 客if filter_words(word):return_list.append((user_id + "_" + append_words(word)[0], 1))return return_list

提交到集群运行

4bda8984859e4bdb8760a429a5209708.png

e35dfa2d151241b5a10bd5d21198ad38.png为什么要在全部的服务器安装jieba库?
因为YARN是集群运行,Executor可以在所有服务器上执行,所以每个服务器都需要有jieba库提供支撑。
如何尽量提高任务计算的资源?
计算CPU核心和内存量,通过--executor-memory 指定executor内存,通过--executor-cores 指定
executor的核心数,通过--num-executors 指定总executor数量。

 

http://www.lryc.cn/news/89160.html

相关文章:

  • 字节跳动五面都过了,结果被刷了,问了hr原因竟说是...
  • Python日期带时区转换工具类总结
  • 视频会议产品对比分析
  • 每日一练 | 华为认证真题练习Day47
  • ChatIE(LLM大模型用于信息抽取)
  • 提升企业管理效率的利器——ADManager Plus
  • 《入侵的艺术》读书心得:第六章:渗透测试中的智慧与愚昧
  • SAP-MM-采购申请-价值特性
  • 设计模式 - 代理模式
  • IOC初始化 IOC启动阶段 (Spring容器的启动流程)
  • Java后端入职第四天,就被要求代码回退(Git回退实战)
  • 【swing】SplitPanel
  • 网络货运平台源码 管理平台端+司机端APP+货主端APP源码
  • Yarn学习笔记
  • 智能路由器开发之OpenWrt简介
  • Linux音频和视频命令速查表
  • 脉蜂:Django + Flutter 开发的进销存管理系统【已开源】
  • 树的先序,中序,后序递归遍历
  • 如何在Linux中更改SSH端口?
  • 合创视觉科技UI设计师就业发展前景怎么样?薪资待遇如何?
  • VB一个可以改变箭头方向的气泡提示
  • STM8、STM8S003F3P6 双机串口通信(片上串口)
  • FPGA基于AXI 1G/2.5G Ethernet Subsystem实现千兆UDP通信 提供工程源码和技术支持
  • 机器学习基础知识之多模型性能对比评价方法
  • 对敏感信息脱敏,如对姓名、证件号码、手机号码、银行卡号码进行脱敏
  • 创建型——单例模式C++实现
  • 【华为OD统一考试B卷 | 100分】执行时长(C++ Java JavaScript Python)
  • 操作系统原理 —— 内存管理的概念(十八)
  • GPT-4国内怎么用
  • 搭建LightPicture开源免费图床系统「公网远程控制」