当前位置: 首页 > news >正文

7.7晚自习作业

实操作业02:Spark核心开发

作业说明

  • 请严格按照步骤操作,并将最终结果文件(命名为:sparkcore_result.txt)于20点前上传
  • 结果文件需包含每一步的关键命令执行结果文本输出。

一、数据读取与转换操作

  1. 上传账户数据$DATA_EXERCISE/accounts到HDFS的/dw/accounts目录,从HDFS路径/dw/accounts读取accounts数据文件
hadoop fs -mkdir -p /dw/accounts
hadoop fs -put $DATA_EXERCISE/accounts /dw/accounts/

  1. 将每行数据按逗号分割成字段数组
  2. 以邮政编码字段(第9个字段)作为key,创建键值对RDD
  3. 查看转换后的数据结构,显示前2条记录

step1. 创建 RDD(读取所有 part 文件)

// 使用通配符 * 读取目录下所有 part 文件
val accountsRDD = sc.textFile("hdfs://master:8020/dw/accounts/accounts/part-*")// 验证数据加载
accountsRDD.take(2).foreach(println)

step2:数据转换

// 一、数据转换操作
val zipKeyValueRDD = accountsRDD.map(line => {val fields = line.split(",")  // 按逗号分割每行数据val zipCode = fields(8)      // 提取第9个字段(索引8)作为邮政编码(zipCode, fields)            // 返回(邮编, 字段数组)的键值对
})// 显示前2条
zipKeyValueRDD.take(2).foreach { case (zip, fields) => println(s"Key: $zip | Value: ${fields.mkString(",")}")
}

为什么可以确定line是一个数组属性的变量呢

解析:
line: 是一个 String 类型的变量,表示CSV文件的某一行(例如:"1,John,Doe,10001")
.split(","):
是String类的方法,按指定的分隔符(这里是逗号 ,)切割字符串
返回一个 Array[String](字符串数组)
val fields:
定义一个 不可变变量(Scala中用val声明)
类型由编译器自动推断为 Array[String]
存储切割后的字段数组可以通过Spark-shell直接验证类型:
// 示例数据
val line = "1,John,Doe,10001"
// 执行切割
val fields = line.split(",")  // 得到:Array("1", "John", "Doe", "10001")
// 查看类型
println(fields.getClass)  // 输出:class [Ljava.lang.String; (表示String数组)

这三行代码也只能输出一行数据的转换结果,但是这个是一整个文件的转换结果,应该是有循环在里面的,循环体现在哪呢

这里的关键在于 Spark的分布式计算模型 和 RDD的转换操作 的运作机制。让我们深入解析这个"隐式循环"的实现原理:

1. 表面现象 vs 底层实现

你看到的代码(看似单行处理)

scala

val zipKeyValueRDD = accountsRDD.map(line => {val fields = line.split(",")  // 似乎只处理一行val zipCode = fields(8)(zipCode, fields)
})

实际发生的处理(分布式并行处理)

python

# 伪代码:Spark集群的实际执行逻辑
for partition in accountsRDD.partitions:    # 遍历所有数据分区for line in partition:                   # 分区内逐行处理(真正的循环)fields = line.split(",")emit((fields[8], fields))             # 输出键值对

2. 循环的隐藏位置

Spark通过以下机制实现"隐式循环":

组件作用类比传统编程
RDD.map()定义转换逻辑相当于循环体
Partitions(分区)数据分片相当于外层循环
Tasks(任务)并行处理单元相当于多线程循环

3. 完整数据处理流程

  1. 输入阶段

    • 假设原始文件有100万行,被Spark自动划分为N个分区(如HDFS的block数)

  2. 并行执行阶段

    • 每个分区的数据会被不同的Executor进程处理

    • 每个Executor对分配到的分区数据 逐行执行map函数

  3. 输出阶段

    • 所有节点的处理结果合并成新的RDD

4. 关键问题解答

Q: 为什么代码没有显式循环却能处理整个文件?

  • Spark的RDD操作是声明式的:你只需要定义"做什么"(业务逻辑),Spark框架自动处理"怎么做"(分布式循环)

  • 数据分区和并行计算

    scala

    // 假设文件被分为3个分区
    accountsRDD.partitions.size  // 返回分区数

    每个分区会触发一个Task,每个Task内部会循环处理其分配到的数据

Q: 如何验证确实处理了所有数据?

scala

// 查看处理后的记录数
zipKeyValueRDD.count() 

// 查看分区处理情况(需在Spark UI观察)
zipKeyValueRDD.toDebugString


5. 与传统编程的对比

特性Spark RDD传统Java/Python
循环方式隐式(由框架管理)显式(for/while)
并行度自动分区并行需手动线程/进程管理
数据规模支持TB/PB级通常单机内存限制
代码示例rdd.map(x => f(x))for x in list: f(x)
  • 循环确实存在:但被Spark框架隐藏在分布式运行时中

  • 你的代码是"循环体"map中的逻辑会被应用到每一行数据

  • 优势:开发者无需关心并行化和数据分发,专注业务逻辑

二、数据聚合操作

  1. 对上述RDD提取每个记录的姓名字段:
  2. 提取第5个字段(first_name)和第4个字段(last_name)
  3. 将姓和名用逗号连接
val nameByZipRDD = zipKeyValueRDD.mapValues(fields => s"${fields(4)},${fields(3)}"  // 格式化为"姓,名"
)

    在Scala中,s"${fields(4)},${fields(3)}" 是一种称为 字符串插值(String Interpolation) 的语法

    1. 字符串插值的组成

    部分含义示例
    开头的s表示启用字符串插值s"..."
    ${}插入变量/表达式的语法${fields(4)}
    引号内内容固定字符串+动态变量组合"姓,名"

    2. 具体到代码

    scala

    s"${fields(4)},${fields(3)}"
    • 等效的普通写法

      scala

      fields(4) + "," + fields(3)  // 直接字符串拼接

    • 执行过程

      1. 取出数组fields的第5个元素(索引4)

      2. 取出第4个元素(索引3)

      3. 用逗号连接两者

    3. 对比其他语言

    语言类似语法示例
    Scalas"${var}"s"Hello, ${name}"
    Pythonf-stringf"Hello, {name}"
    JavaScript模板字符串`Hello, ${name}`

    1. map vs mapValues 的本质区别

    操作函数签名输入 → 输出在你的代码中的应用
    map(T) => U整个元素 → 新元素line => (zipCode, fields)
    mapValues(V) => U仅值部分 → 新值(键不变)fields => "姓,名"

    2.代码中两个阶段的解析

    (1)第一阶段:数据转换 (map)

    scala

    val zipKeyValueRDD = accountsRDD.map(line => {val fields = line.split(",")       // String → Array[String]val zipCode = fields(8)           // 提取key(zipCode, fields)                 // 返回: (String, Array[String])
    })
    • line => 的含义:

      • 输入:原始字符串(如 "1,John,Doe,10001"

      • 输出:完全新建的键值对 (String, Array[String])

    • 数据流

      text

      "1,John,Doe,10001" → split → ["1","John","Doe","10001"] → 取fields(8)作为key → 输出 ("10001", ["1","John","Doe","10001",...])
    (2)第二阶段:聚合 (mapValues)

    scala

    val nameByZipRDD = zipKeyValueRDD.mapValues(fields => s"${fields(4)},${fields(3)}"  // 仅修改value部分
    )
    • fields => 的含义:

      • 输入:已有键值对的值部分(即之前的 Array[String]

      • 输出:仅更新值(键 zipCode 保持不变)

    • 数据流

      text

      输入: ("10001", ["1","John","Doe","10001",...])→ 提取fields(4)和fields(3) → 输出 ("10001", "Doe,John")  // 键未改变!

    3. => 的本质

    • => 是Scala中的函数定义符号,表示:

      scala

      val func: InputType => OutputType = (input) => { // 处理input output 
      }
    • 在代码中:

      • line => ...:定义了一个从 String 到 (String, Array[String]) 的函数

      • fields => ...:定义了一个从 Array[String] 到 String 的函数

    1. 按邮政编码分组
    2. 查看聚合结果,显示前2条记录
    val groupedByNameRDD = nameByZipRDD.groupByKey()// 显示前2组
    groupedByNameRDD.take(2).foreach {case (zip, names) => println(s"$zip -> ${names.mkString("; ")}")
    }

    三、数据排序与展示

    1. 对分组后的RDD按邮政编码进行升序排列
    2. 取前5条记录进行展示
    3. 对每条记录,先打印邮政编码,然后打印该邮政编码下的所有姓名列表
    groupedByNameRDD.sortByKey().take(5).foreach {case (zip, names) =>println(s"\n=== 邮政编码: $zip ===")names.foreach(println)
    }


    四、提交要求

    1. 代码和结果文件:将代码及其执行后的输出结果保存到sparkcore_result.txt文件中

    2. 结果文件应包含

    3. 数据读取与转换操作的代码和输出结果
    4. 数据聚合操作的代码和输出结果
    5. 数据排序与展示的代码和输出结果

    http://www.lryc.cn/news/582368.html

    相关文章:

  1. 【Behavior Tree】-- 行为树AI逻辑实现- Unity 游戏引擎实现
  2. Kafka生产者的初始化
  3. 【人工智能】ChatGPT、DeepSeek-R1、DeepSeek-V3 辨析
  4. 20250707-4-Kubernetes 集群部署、配置和验证-kubeconfig_笔记
  5. Maven依赖与JRebel热部署一站式解决方案
  6. Java 命令行参数详解:系统属性、JVM 选项与应用配置
  7. 【牛客算法】游游的整数切割
  8. c语言中的函数VII
  9. 回溯题解——子集【LeetCode】输入的视角(选或不选)
  10. 机器学习知识
  11. 独立开发A/B测试实用教程
  12. Docker 稳定运行与存储优化全攻略(含可视化指南)
  13. LeetCode 151. 反转字符串中的单词
  14. TCP长连接保持在线状态
  15. 人工智能-基础篇-23-智能体Agent到底是什么?怎么理解?(智能体=看+想+做)
  16. 数据中台架构解析:湖仓一体的实战设计
  17. 计算阶梯电费
  18. C盘瘦身 -- 虚拟内存文件 pagefile.sys
  19. Go defer(二):从汇编的角度理解延迟调用的实现
  20. MIL-STD-1553B总线
  21. NLP自然语言处理 02 RNN及其变体
  22. 【Java面试】Https和Http的区别?以及分别的原理是什么?
  23. 【应急响应】Linux 自用应急响应工具(LinuxCheckShoot)
  24. 【力扣(LeetCode)】数据挖掘面试题0003: 356. 直线镜像
  25. 明星AI自动化测试工具Midscene.js源码解析
  26. Vidwall: 支持将 4K 视频设置为动态桌面壁纸,兼容 MP4 和 MOV 格式
  27. 【保姆级图文详解】探秘 Prompt 工程:AI 交互的关键密码
  28. 【Netty基础】Java原生网络编程
  29. 熔断限流降级
  30. [附源码+数据库+毕业论文]基于Spring+MyBatis+MySQL+Maven+jsp实现的高校实验室资源综合管理系统,推荐!