当前位置: 首页 > news >正文

用SparkSQL和PySpark完成按时间字段顺序将字符串字段中的值组合在一起分组显示

用SparkSQL和PySpark完成以下数据转换。
源数据:
userid,page_name,visit_time
1,A,2021-2-1
2,B,2024-1-1
1,C,2020-5-4
2,D,2028-9-1

目的数据:
user_id,page_name_path
1,C->A
2,B->D

PySpark:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window# 初始化SparkSession(如果在已有环境中可以直接使用已有的spark对象)
spark = SparkSession.builder.appName("DataTransformation").getOrCreate()# 创建示例数据的DataFrame
data = [(1, "A", "2021-2-1"),(2, "B", "2024-1-1"),(1, "C", "2020-5-4"),(2, "D", "2028-9-1")
]
columns = ["userid", "page_name", "visit_time"]
df = spark.createDataFrame(data, columns)# 将visit_time转换为日期类型,方便后续排序
df = df.withColumn("visit_time", F.to_date(F.col("visit_time")))# 按照userid分区,根据visit_time排序创建窗口
window_spec = Window.partitionBy("userid").orderBy("visit_time")# 使用collect_list函数收集每个userid对应的page_name列表,然后使用concat_ws函数将其拼接为指定格式
result_df = df.withColumn("page_name_list", F.collect_list("page_name").over(window_spec)) \.groupBy("userid") \.agg(F.concat_ws("->", F.col("page_name_list")).alias("page_name_path")) \.select("userid", "page_name_path")# 重命名userid列为user_id(和目标数据列名一致)
result_df = result_df.withColumnRenamed("userid", "user_id")# 展示结果
result_df.show()

SparkSQL:

SELECT userid AS user_id,CONCAT_WS('->', collect_list(page_name) OVER (PARTITION BY userid ORDER BY visit_time)) AS page_name_path
FROM page_visits
GROUP BY userid
http://www.lryc.cn/news/507348.html

相关文章:

  • Sentinel 学习笔记3-责任链与工作流程
  • Latex 转换为 Word(使用GrindEQ )(英文转中文,毕业论文)
  • 使用Chat-LangChain模块创建一个与用户交流的机器人
  • 国家认可的人工智能从业人员证书如何报考?
  • 【网络云计算】2024第51周-每日【2024/12/17】小测-理论-解析
  • 每日十题八股-2024年12月19日
  • 网络方案设计
  • 学习记录:electron主进程与渲染进程直接的通信示例【开箱即用】
  • 【Java数据结构】ArrayList类
  • HDR视频技术之十:MPEG 及 VCEG 的 HDR 编码优化
  • 71 mysql 中 insert into ... on duplicate key update ... 的实现
  • 计算机网络-GRE Over IPSec实验
  • 你的第一个博客-第一弹
  • 若依启动项目时配置为 HTTPS 协议
  • 学习思考:一日三问(学习篇)之匹配VLAN
  • [WiFi] WiFi 802.1x介绍及EAP认证流程整理
  • 用C#(.NET8)开发一个NTP(SNTP)服务
  • Mybatis能执行一对一、一对多的关联查询吗?都有哪些实现方式,以及它们之间的区别
  • ABAP SQL 取日期+时间最新的一条数据
  • 【Rust自学】4.3. 所有权与函数
  • 【Redis分布式锁】高并发场景下秒杀业务的实现思路(集群模式)
  • 用docker快速安装电子白板Excalidraw绘制流程图
  • 使用Turtle库实现,鼠标左键绘制路径,用鼠标右键结束绘制,小海龟并沿路径移动
  • 人工智能入门是先看西瓜书还是先看花书?
  • winform中屏蔽双击最大化或最小化窗体(C#实现),禁用任务管理器结束程序,在需要屏蔽双击窗体最大化、最小化、关闭
  • 进程内存转储工具|内存镜像提取-取证工具
  • 数据结构day5:单向循环链表 代码作业
  • (OCPP服务器)SteVe编译搭建全过程
  • Mybatis分页插件的使用问题记录
  • 36. Three.js案例-创建带光照和阴影的球体与平面