当前位置: 首页 > news >正文

作业03-SparkSQL开发

作业03-SparkSQL开发


一、数据读取与转换操作

  1. 从HDFS路径/dw/accounts读取accounts数据文件,并选取id、address(第6列)字段,创建DataFrame,名为acctdf(字段名分别为userid、address)。
val acct = sc.textFile("/dw/accounts/*")val acctdf = acctRDD.map(line => {val fields = line.split(",")(fields(0), fields(5)) // 提取id和address
}).toDF("userid", "address")acctdf.printSchema()
acctdf.show(2)

从HDFS路径/dw/accounts/读取所有文件,创建RDD[String]
每行是一个字符串,代表一条账户记录对每条记录进行映射转换:
使用,分割字符串
提取第1个字段(id)和第6个字段(address)首先输入的是line,经过split被分割程数组fields
再将fields(0)当做key,fields(5)当做value输出为一个pairRdd将RDD转换为DataFrame,并命名列名为"userid"和"address"
在调用 toDF() 之后,它就变成了一个普通的DataFrame,不再保留键值对的特性。

    转换过程:第一行: "1,2008-10-23...,2275 Washburn Street,..."
    split(",") → Array("1", "2008-10-23...", ..., "2275 Washburn Street", ...)
    取第0个和第5个元素 → ("1", "2275 Washburn Street")
    1. 上传日志数据$DATA_EXERCISE/weblogs到HDFS的/dw/weblogs目录,从HDFS路径/dw/weblogs读取日志数据,提取用户id(第3个字段),统计每个用户的请求次数,生成(userid, reqs)的RDD。
    2. 将上述RDD转换为DataFrame,名为userdf,字段名为userid、reqs。
    // 2. 处理weblogs数据,创建userdf DataFrame
    val logsRDD = sc.textFile("/dw/weblogs")
    // 提取用户id(第3个字段)并统计每个用户的请求次数
    val userReqRDD = logsRDD.map(line => {val fields = line.split(" ")(fields(2), 1) // (userid, 1)
    }).reduceByKey(_ + _) // 按userid聚合计算总请求数// 将RDD转换为DataFrame
    val userdf = spark.createDataFrame(userReqRDD.map{case (userid, reqs) => Row(userid, reqs)}, StructType(Seq(StructField("userid", StringType, false),StructField("reqs", IntegerType, false)))// 显示userdf结构
    userdf.printSchema()
    userdf.show(2)
    对每条日志记录进行映射转换:
    使用空格分割字符串
    提取第3个字段(userid)作为键,值为1
    使用reduceByKey对相同userid的记录进行求和,得到每个用户的总请求数map 操作:
    输入:logsRDD 的每一行 line(假设是日志字符串)
    对每行按空格分割成数组 fields
    提取 fields(2) 作为 key(假设是 userid),1 作为 value
    → 生成一个 PairRDD[(String, Int)],形式为 (userid, 1)
    reduceByKey 操作:
    对相同 userid(key)的所有 1(value)进行求和(_ + _)
    → 最终得到每个 userid 的总请求次数,形式为 (userid, total_requests)
    结果:
    userReqRDD 是一个不可变的 PairRDD,内容为 (userid, 总请求数)。将RDD转换为DataFrame:
    使用Row对象封装每行数据
    定义Schema:userid(字符串类型,非空)和reqs(整数类型,非空)map{case (userid, reqs) => Row(userid, reqs)}
    对 userReqRDD 的每个键值对进行映射,将 (userid, reqs) 转换为 Spark 的 Row 对象。
    Row 是 DataFrame 中一行数据的容器,类似于数据库中的一行记录。
    例如:("user1", 5) → Row("user1", 5)

    代码的等价简化写法

    如果觉得 Row 和 StructType 的写法较复杂,可以用更简洁的方式实现相同功能:

    val userdf = userReqRDD.toDF("userid", "reqs")

    转换过程:
    第一行: "34.28.1.122 - 65255 [01/Mar/2014..."
    split(" ") → Array("34.28.1.122", "-", "65255", "[01/Mar/2014...", ...)
    取第2个元素 → ("65255", 1)
    对相同userid的值求和 → ("65255", 2)

    提示:

    使用spark.createDataFrame实现将RDD转换为DataFrame

    使用toDF来更名


    二、数据关联与分析操作

    1. 将userdf与acctdf按用户id进行关联,生成包含userid、reqs、address字段的新DataFrame。
    2. 对关联结果进行过滤,仅保留请求次数大于5次的用户。
    3. 按照reqs字段降序排列并显示出前10行数据
    // 1. 关联两个 DataFrame
    val joinedDF = userdf.join(acctdf, "userid")// 2. 过滤请求次数 > 5 的用户
    val filteredDF = joinedDF.filter("reqs > 5")// 3. 按 reqs 降序排序,并显示前 10 行
    val resultDF = filteredDF.orderBy($"reqs".desc).limit(10)
    resultDF.show()


    http://www.lryc.cn/news/582770.html

    相关文章:

  1. 无缝矩阵的音频合成与音频分离功能详解
  2. Rust BSS段原理与实践解析
  3. RustFS一款Rust 驱动的 高性能 分布式存储系统
  4. Modbus TCP转Profinet网关实现视觉相机与西门子PLC配置实例研究
  5. Tomcat:启用https(Windows)
  6. 传输层协议TCP、UDP
  7. CI/CD — DevOps概念之实现k8s持续交付持续集成(一)
  8. 数据结构:位图
  9. IDEA Maven报错 无法解析 com.taobao:parent:pom:1.0.1【100%解决 此类型问题】
  10. 分布式光纤传感:为储能安全保驾护航
  11. 广度优先与深度优先遍历核心逻辑理解及实践
  12. 关于 scrapy框架 详解
  13. OpenCV在Visual Studio 2022下的配置
  14. Android 中的多线程编程全面解析
  15. 【机器学习笔记 Ⅲ】5 强化学习
  16. 【docker】linux CentOS docker 安装流程
  17. Centos和麒麟系统如何每天晚上2点10分定时备份达梦数据库
  18. Redis:高性能内存数据库与缓存利器
  19. java内存缓存实现 与 redis缓存实现 (ConcurrentHashMap 应用)
  20. Mac安装Docker(使用orbstack代替)
  21. 从深度学习的角度看自动驾驶
  22. ubuntu24.04(vmware workstation 17.6pro)无法安装vmtools的问题解决
  23. Using Spring for Apache Pulsar:Quick Tour
  24. 短视频矩阵管理平台的崛起:源头厂商的深度解析
  25. Rust 的 Copy 语义:深入浅出指南
  26. huggingface笔记:文本生成Text generation
  27. 【Node.js】文本与 pdf 的相互转换
  28. 在 Linux(openEuler 24.03 LTS-SP1)上安装 Kubernetes + KubeSphere 的防火墙放行全攻略
  29. 京东携手HarmonyOS SDK首发家电AR高精摆放功能
  30. 代码详细注释:嵌入式Linux LCD汉字显示程序(基于font.h字库头文件)