Spark 3:Spark Core RDD持久化
RDD 的数据是过程数据
RDD 的缓存
# coding:utf8
import timefrom pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevelif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd1 = sc.textFile("../data/input/words.txt")rdd2 = rdd1.flatMap(lambda x: x.split(" "))rdd3 = rdd2.map(lambda x: (x, 1))# 缓存到内存中rdd3.cache()# 先放内存,如果不够则放硬盘,2份副本rdd3.persist(StorageLevel.MEMORY_AND_DISK_2)rdd4 = rdd3.reduceByKey(lambda a, b: a + b)print(rdd4.collect())rdd5 = rdd3.groupByKey()rdd6 = rdd5.mapValues(lambda x: sum(x))print(rdd6.collect())# 清理缓存rdd3.unpersist()time.sleep(100000)
RDD 的CheckPoint
# coding:utf8
import timefrom pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevelif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)# 1. 告知spark, 开启CheckPoint功能sc.setCheckpointDir("hdfs://node1:8020/output/ckp")rdd1 = sc.textFile("../data/input/words.txt")rdd2 = rdd1.flatMap(lambda x: x.split(" "))rdd3 = rdd2.map(lambda x: (x, 1))# 调用checkpoint API 保存数据即可rdd3.checkpoint()rdd4 = rdd3.reduceByKey(lambda a, b: a + b)print(rdd4.collect())rdd5 = rdd3.groupByKey()rdd6 = rdd5.mapValues(lambda x: sum(x))print(rdd6.collect())rdd3.unpersist()time.sleep(100000)
Cache和Checkpoint区别
Cache是轻量化保存RDD数据,可存储在内存和硬盘,是分散存储,设计上数据是不安全的(保留RDD血缘关系)。
CheckPoint是重量级保存RDD数据,是集中存储,只能存储在硬盘(HDFS)上,设计上是安全的(不保留RDD血缘关系)。
Cache 和 CheckPoint的性能对比?
Cache性能更好,因为是分散存储,各个Executor并行执行,效率高,可以保存到内存中(占内存),更快。
CheckPoint比较慢,因为是集中存储,涉及到网络IO,但是存储到HDFS上更加安全(多副本) 。
案例练习:搜索引擎日志分析
# coding:utf8# 导入Spark的相关包
import timefrom pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevel
from defs import context_jieba, filter_words, append_words, extract_user_and_word
from operator import addif __name__ == '__main__':# 0. 初始化执行环境 构建SparkContext对象conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)# 1. 读取数据文件file_rdd = sc.textFile("hdfs://node1:8020/input/SogouQ.txt")# 2. 对数据进行切分 \tsplit_rdd = file_rdd.map(lambda x: x.split("\t"))# 3. 因为要做多个需求, split_rdd 作为基础的rdd 会被多次使用.split_rdd.persist(StorageLevel.DISK_ONLY)# TODO: 需求1: 用户搜索的关键`词`分析# 主要分析热点词# 将所有的搜索内容取出# print(split_rdd.takeSample(True, 3))context_rdd = split_rdd.map(lambda x: x[2])# 对搜索的内容进行分词分析words_rdd = context_rdd.flatMap(context_jieba)# print(words_rdd.collect())# 院校 帮 -> 院校帮# 博学 谷 -> 博学谷# 传智播 客 -> 传智播客filtered_rdd = words_rdd.filter(filter_words)# 将关键词转换: 传智播 -> 传智播客final_words_rdd = filtered_rdd.map(append_words)# 对单词进行 分组 聚合 排序 求出前5名result1 = final_words_rdd.reduceByKey(lambda a, b: a + b).\sortBy(lambda x: x[1], ascending=False, numPartitions=1).\take(5)print("需求1结果: ", result1)# TODO: 需求2: 用户和关键词组合分析# 1, 我喜欢传智播客# 1+我 1+喜欢 1+传智播客user_content_rdd = split_rdd.map(lambda x: (x[1], x[2]))# 对用户的搜索内容进行分词, 分词后和用户ID再次组合user_word_with_one_rdd = user_content_rdd.flatMap(extract_user_and_word)# 对内容进行 分组 聚合 排序 求前5result2 = user_word_with_one_rdd.reduceByKey(lambda a, b: a + b).\sortBy(lambda x: x[1], ascending=False, numPartitions=1).\take(5)print("需求2结果: ", result2)# TODO: 需求3: 热门搜索时间段分析# 取出来所有的时间time_rdd = split_rdd.map(lambda x: x[0])# 对时间进行处理, 只保留小时精度即可hour_with_one_rdd = time_rdd.map(lambda x: (x.split(":")[0], 1))# 分组 聚合 排序result3 = hour_with_one_rdd.reduceByKey(add).\sortBy(lambda x: x[1], ascending=False, numPartitions=1).\collect()print("需求3结果: ", result3)time.sleep(100000)import jiebadef context_jieba(data):"""通过jieba分词工具 进行分词操作"""seg = jieba.cut_for_search(data)l = list()for word in seg:l.append(word)return ldef filter_words(data):"""过滤不要的 谷 \ 帮 \ 客"""return data not in ['谷', '帮', '客']def append_words(data):"""修订某些关键词的内容"""if data == '传智播': data = '传智播客'if data == '院校': data = '院校帮'if data == '博学': data = '博学谷'return (data, 1)def extract_user_and_word(data):"""传入数据是 元组 (1, 我喜欢传智播客)"""user_id = data[0]content = data[1]# 对content进行分词words = context_jieba(content)return_list = list()for word in words:# 不要忘记过滤 \谷 \ 帮 \ 客if filter_words(word):return_list.append((user_id + "_" + append_words(word)[0], 1))return return_list
提交到集群运行
为什么要在全部的服务器安装jieba库?
因为YARN是集群运行,Executor可以在所有服务器上执行,所以每个服务器都需要有jieba库提供支撑。
如何尽量提高任务计算的资源?
计算CPU核心和内存量,通过--executor-memory 指定executor内存,通过--executor-cores 指定
executor的核心数,通过--num-executors 指定总executor数量。