PySpark 常用算子详解
PySpark 提供了丰富的算子用于分布式数据处理,主要分为转换算子(Transformations)和行动算子(Actions)。转换算子用于创建新的 RDD 或 DataFrame,而行动算子触发实际的计算并返回结果。
一、RDD 常用算子
1. 转换算子
转换算子是惰性的,不会立即执行,而是构建计算图。
1.1 基本转换
map(func)
对 RDD 中的每个元素应用函数func
。rdd = sc.parallelize([1, 2, 3]) squared = rdd.map(lambda x: x**2) # [1, 4, 9]
filter(func)
过滤出满足函数func
条件的元素。even_nums = rdd.filter(lambda x: x % 2 == 0) # [2]
flatMap(func)
先应用func
,再将结果展平。words = sc.parallelize(["Hello world", "Spark is fast"]) flat_words = words.flatMap(lambda x: x.split()) # ["Hello", "world", "Spark", "is", "fast"]
1.2 集合操作
union(other)
合并两个 RDD。rdd1 = sc.parallelize([1, 2]) rdd2 = sc.parallelize([3, 4]) union_rdd = rdd1.union(rdd2) # [1, 2, 3, 4]
intersection(other)
返回两个 RDD 的交集。rdd1 = sc.parallelize([1, 2, 3]) rdd2 = sc.parallelize([3, 4, 5]) intersect_rdd = rdd1.intersection(rdd2) # [3]
distinct()
去重。rdd = sc.parallelize([1, 1, 2, 2, 3]) distinct_rdd = rdd.distinct() # [1, 2, 3]
1.3 键值对操作
groupByKey()
按键分组。pairs = sc.parallelize([("a", 1), ("a", 2), ("b", 3)]) grouped = pairs.groupByKey() # [("a", [1, 2]), ("b", [3])]
reduceByKey(func)
按键聚合值。summed = pairs.reduceByKey(lambda x, y: x + y) # [("a", 3), ("b", 3)]
join(other)
键值对 RDD 的内连接。rdd1 = sc.parallelize([("a", 1), ("b", 2)]) rdd2 = sc.parallelize([("a", 3), ("a", 4)]) joined = rdd1.join(rdd2) # [("a", (1, 3)), ("a", (1, 4))]
2. 行动算子
行动算子触发计算并返回结果或写入外部存储。
2.1 基本行动
collect()
将 RDD 的所有元素收集到驱动程序。rdd = sc.parallelize([1, 2, 3]) result = rdd.collect() # [1, 2, 3]
count()
返回 RDD 的元素个数。count = rdd.count() # 3
take(n)
返回 RDD 的前 n 个元素。first_two = rdd.take(2) # [1, 2]
reduce(func)
使用函数func
聚合 RDD 元素。total = rdd.reduce(lambda x, y: x + y) # 6
2.2 保存操作
saveAsTextFile(path)
将 RDD 保存为文本文件。rdd.saveAsTextFile("hdfs://path/to/output")
二、DataFrame 常用算子
1. 转换算子
DataFrame 的转换算子基于关系代数,支持 SQL 风格操作。
1.1 选择与过滤
select(cols)
选择列。df.select("name", "age").show()
filter(condition)
过滤行。df.filter(df["age"] > 20).show()
where(condition)
等价于filter
df.where("age > 20").show()
1.2 聚合操作
groupBy(cols)
按列分组。df.groupBy("department").avg("salary").show()
agg(expressions)
自定义聚合。from pyspark.sql.functions import sum, avg df.agg(sum("sales"), avg("age")).show()
1.3 连接操作
join(other, on, how)
连接两个 DataFrame。df1.join(df2, on="id", how="inner").show()
1.4 排序与去重
sort(cols)
排序。df.sort("age", ascending=False).show()
dropDuplicates(subset)
去重。df.dropDuplicates(["name", "age"]).show()
2. 行动算子
show(n)
显示前 n 行。df.show(5)
count()
统计行数。rows = df.count()
collect()
收集所有行到驱动程序。data = df.collect()
toPandas()
转换为 Pandas DataFrame。pandas_df = df.toPandas()
三、SQL 函数
PySpark 提供了丰富的 SQL 函数,用于复杂的数据处理。
1. 数学函数
sum()
,avg()
,max()
,min()
,count()
round()
,sqrt()
,log()
,exp()
2. 字符串函数
concat()
,substring()
,lower()
,upper()
,trim()
split()
,regexp_replace()
3. 日期时间函数
current_date()
,current_timestamp()
date_format()
,year()
,month()
,dayofmonth()
4. 条件函数
when()
,otherwise()
ifnull()
,coalesce()
示例:
from pyspark.sql.functions import when, coldf.withColumn("age_group", when(col("age") < 18, "minor").when(col("age") < 60, "adult").otherwise("senior"))
四、窗口函数
窗口函数允许在特定行组上执行计算,无需分组。
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, row_numberwindow_spec = Window.partitionBy("department").orderBy("salary")# 添加排名列
df.withColumn("rank", rank().over(window_spec)).show()
五、算子执行顺序
PySpark 采用惰性计算:
- 转换算子构建 DAG(有向无环图)
- 行动算子触发 DAG 的执行
- 中间结果可能被缓存(使用
cache()
或persist()
)
示例:
rdd = sc.parallelize([1, 2, 3, 4])
rdd = rdd.map(lambda x: x**2).filter(lambda x: x > 5) # 转换
result = rdd.collect() # 行动,触发计算
六、性能优化建议
- 避免使用
collect()
:仅用于小数据结果,大数据会导致驱动程序内存溢出 - 使用广播变量:将小表广播到所有 Executor,减少 Shuffle
- 合理分区:通过
repartition()
或coalesce()
调整分区数 - 缓存重用数据:对需要多次使用的 RDD 或 DataFrame 使用
cache()
- 优先使用 DataFrame:比 RDD 更高效(基于 Catalyst 优化器)
通过掌握这些算子,你可以高效地处理和分析大规模数据集。在实际应用中,建议结合具体业务场景选择合适的算子,并注意性能调优。