当前位置: 首页 > news >正文

【Python】PySpark 数据计算 ② ( RDD#flatMap 方法 | RDD#flatMap 语法 | 代码示例 )

文章目录

  • 一、RDD#flatMap 方法
    • 1、RDD#flatMap 方法引入
    • 2、解除嵌套
    • 3、RDD#flatMap 语法说明
  • 二、代码示例 - RDD#flatMap 方法





一、RDD#flatMap 方法




1、RDD#flatMap 方法引入


RDD#map 方法 可以 将 RDD 中的数据元素 逐个进行处理 , 处理的逻辑 需要用外部 通过 参数传入 map 函数 ;

RDD#flatMap 方法在 RDD#map 方法 的基础上 , 增加了 " 解除嵌套 " 的作用 ;

RDD#flatMap 方法 也是 接收一个 函数 作为参数 , 该函数被应用于 RDD 中的每个元素及元素嵌套的子元素 , 并返回一个 新的 RDD 对象 ;


2、解除嵌套


解除嵌套 含义 : 下面的的 列表 中 , 每个元素 都是一个列表 ;

lst = [[1, 2], [3, 4, 5], [6, 7, 8]]

如果将上述 列表 解除嵌套 , 则新的 列表 如下 :

lst = [1, 2, 3, 4, 5, 6, 7, 8]

RDD#flatMap 方法 先对 RDD 中的 每个元素 进行处理 , 然后再 将 计算结果展平放到一个新的 RDD 对象中 , 也就是 解除嵌套 ;

这样 原始 RDD 对象 中的 每个元素 , 都对应 新 RDD 对象中的若干元素 ;


3、RDD#flatMap 语法说明


RDD#flatMap 语法说明 :

newRDD = oldRDD.flatMap(lambda x: [element1, element2, ...])

旧的 RDD 对象 oldRDD 中 , 每个元素应用一个 lambda 函数 , 该函数返回多个元素 , 返回的多个元素就会被展平放入新的 RDD 对象 newRDD 中 ;


代码示例 :

# 将 字符串列表 转为 RDD 对象
rdd = sparkContext.parallelize(["Tom 18", "Jerry 12", "Jack 21"])# 应用 map 操作,将每个元素 按照空格 拆分
rdd2 = rdd.flatMap(lambda element: element.split(" "))




二、代码示例 - RDD#flatMap 方法



代码示例 :

"""
PySpark 数据处理
"""# 导入 PySpark 相关包
from pyspark import SparkConf, SparkContext
# 为 PySpark 配置 Python 解释器
import os
os.environ['PYSPARK_PYTHON'] = "Y:/002_WorkSpace/PycharmProjects/pythonProject/venv/Scripts/python.exe"# 创建 SparkConf 实例对象 , 该对象用于配置 Spark 任务
# setMaster("local[*]") 表示在单机模式下 本机运行
# setAppName("hello_spark") 是给 Spark 程序起一个名字
sparkConf = SparkConf() \.setMaster("local[*]") \.setAppName("hello_spark")# 创建 PySpark 执行环境 入口对象
sparkContext = SparkContext(conf=sparkConf)# 打印 PySpark 版本号
print("PySpark 版本号 : ", sparkContext.version)# 将 字符串列表 转为 RDD 对象
rdd = sparkContext.parallelize(["Tom 18", "Jerry 12", "Jack 21"])# 应用 map 操作,将每个元素 按照空格 拆分
rdd2 = rdd.flatMap(lambda element: element.split(" "))# 打印新的 RDD 中的内容
print(rdd2.collect())# 停止 PySpark 程序
sparkContext.stop()

执行结果 :

Y:\002_WorkSpace\PycharmProjects\pythonProject\venv\Scripts\python.exe Y:/002_WorkSpace/PycharmProjects/HelloPython/hello.py
23/07/31 23:02:58 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/31 23:02:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
PySpark 版本号 :  3.4.1
['Tom', '18', 'Jerry', '12', 'Jack', '21']Process finished with exit code 0

在这里插入图片描述

http://www.lryc.cn/news/103137.html

相关文章:

  • 二叉树题目:左叶子之和
  • Spark SQL报错: Task failed while writing rows.
  • Linux系统下U盘打不开: No application is registered as handling this file
  • 07 定时器处理非活动连接(上)
  • python——案例四:判断字符串中的元素组成
  • 一起学算法(插入排序篇)
  • JVM基础篇-本地方法栈与堆
  • 防雷保护区如何划分,防雷分区概念LPZ介绍
  • 随手笔记——3D−3D:ICP求解
  • Python调用各大机器翻译API大全
  • 重生之我要学C++第六天
  • SpringBoot中ErrorPage(错误页面)的使用--【ErrorPage组件】
  • 【Android】APP网络优化学习笔记
  • 简单的知识图谱可视化+绘制nx.Graph()时报错TypeError: ‘_AxesStack‘ object is not callable
  • 【Matlab】基于粒子群优化算法优化BP神经网络的时间序列预测(Excel可直接替换数据)
  • 【机器学习】Cost Function for Logistic Regression
  • 【EI/SCOPUS会议征稿】2023年第四届新能源与电气科技国际学术研讨会 (ISNEET 2023)
  • 【计算机网络】10、ethtool
  • 什么是前端工程化?
  • 【深度学习】【三维重建】windows11环境配置tiny-cuda-nn详细教程
  • Matlab 一种自适应搜索半径的特征提取方法
  • 基于opencv的几种图像滤波
  • puppeteer代理的搭建和配置
  • 【简单认识MySQL的MHA高可用配置】
  • 【云原生】一文学会Docker存储所有特性
  • Android Ble蓝牙App(一)扫描
  • mac pd安装ubuntu并配置远程连接
  • 1.3 eureka+ribbon,完成服务注册与调用,负载均衡源码追踪
  • mysql修改字段长度是否锁表
  • SpringCloud集成OpenTelemetry的实现