当前位置: 首页 > news >正文

Spark SQL自定义collect_list分组排序

想要在spark sql中对group by + concat_ws()的字段进行排序,可以参考如下方法。
原始数据如下:

+---+-----+----+
|id |name |type|
+---+-----+----+
|1  |name1|p   |
|2  |name2|p   |
|3  |name3|p   |
|1  |x1   |q   |
|2  |x2   |q   |
|3  |x3   |q   |
+---+-----+----+

目标数据如下:

+----+---------------------+
|type|value_list           |
+----+---------------------+
|p   |[name3, name2, name1]|
|q   |[x3, x2, x1]         |
+----+---------------------+

spark-shell:

val df=Seq((1,"name1","p"),(2,"name2","p"),(3,"name3","p"),(1,"x1","q"),(2,"x2","q"),(3,"x3","q")).toDF("id","name","type")
df.show(false)

1.使用开窗函数

df.createOrReplaceTempView("test")
spark.sql("select type,max(c) as c1 from (select type,concat_ws('&',collect_list(trim(name)) over(partition by type order by id desc)) as c  from test) as x group by type ")

因为使用开窗函数本身会使用比较多的资源,
这种方式在大数据量下性能会比较慢,所以尝试下面的操作。

2.使用struct和sort_array(array,asc?true,flase)的方式来进行,效率高些:

val df3=spark.sql("select type, concat_ws('&',sort_array(collect_list(struct(id,name)),false).name) as c from test group by type ")
df3.show(false)

例如:计算一个结果形如:

user_id    stk_id:action_type:amount:price:time   stk_id:action_type:amount:price:time   stk_id:action_type:amount:price:time   stk_id:action_type:amount:price:time 

需要按照time 升序排,则:

Dataset<Row> splitStkView = session.sql("select client_id, innercode, entrust_bs, business_amount, business_price, trade_date from\n" +"(select client_id,\n" +"       split(action,':')[0] as innercode,\n" +"       split(action,':')[1] as entrust_bs,\n" +"       split(action,':')[2] as business_amount,\n" +"       split(action,':')[3] as business_price,\n" +"       split(action,':')[4] as trade_date,\n" +"       ROW_NUMBER() OVER(PARTITION BY split(action,':')[0] ORDER BY split(action,':')[4] DESC) AS rn\n" +"from stk_temp)\n" +"where rn <= 5000");splitStkView.createOrReplaceTempView("splitStkView");Dataset<Row> groupStkView = session.sql("select client_id, CONCAT(innercode, ':', entrust_bs, ':', business_amount, ':', business_price, ':', trade_date) as behive, trade_date from splitStkView");groupStkView.createOrReplaceTempView("groupStkView");Dataset<Row> resultData = session.sql("SELECT client_id, concat_ws('\t',sort_array(collect_list(struct(trade_date, behive)),true).behive) as behives FROM groupStkView GROUP BY client_id");

3.udf的方式

import org.apache.spark.sql.functions._
import org.apache.spark.sql._
val sortUdf = udf((rows: Seq[Row]) => {rows.map { case Row(id:Int, value:String) => (id, value) }.sortBy { case (id, value) => -id } //id if asc.map { case (id, value) => value }
})val grouped = df.groupBy(col("type")).agg(collect_list(struct("id", "name")) as "id_name")
val r1 = grouped.select(col("type"), sortUdf(col("id_name")).alias("value_list"))
r1.show(false)
http://www.lryc.cn/news/223596.html

相关文章:

  • 2023年云计算的发展趋势如何?
  • uniapp中picker 获取时间组件如何把年月日改成年月日默认时分秒为00:00:00
  • k8s operator
  • 使用io_uring
  • LeetCode算法题解(回溯)|LeetCode93. 复原 IP 地址、LeetCode78. 子集、LeetCode90. 子集 II
  • vue、react数据绑定的区别?
  • 前端Vue 页面滑动监听 拿到滑动的坐标值
  • CSS实现鼠标移至图片上显示遮罩层及文字效果
  • 【OpenCV实现图像:图像处理技巧之空间滤波】
  • 载波通讯电表的使用年限是多久?
  • 微信小程序多端应用 Donut 多端编译
  • 调试 Mahony 滤波算法的思考 10
  • Bean——IOC(Github上有代码)
  • 功能更新|Leangoo领歌免费敏捷工具支持SAFe大规模敏捷框架
  • 漏刻有时百度地图API实战开发(1)华为手机无法使用addEventListener click 的兼容解决方案
  • 交流信号继电器 DX-31BJ/AC220V JOSEF约瑟 电压启动 面板嵌入式安装
  • SpringCloudAlibaba系列之Nacos配置管理
  • Kyligence Copilot 亮相第六届进博会,增添数智新活力
  • MySQL 批量修改表的列名为小写
  • ElasticSearch 查询方法示例 java
  • 5G毫米波通信中的关键技术
  • 2.3.3 交换机的RSTP技术
  • 国外访问学者/博士后留学人员反诈骗指南
  • 设计模式之组合模式-创建层次化的对象结构
  • Windows 有趣功能集锦
  • 【nodejs版playwright】02-支持多套测试环环境执行用例
  • React高阶组件(Higher-Order Components, HOCs)
  • 利用RoboBrowser库和爬虫代理实现微博视频的爬取
  • 使用Redis实现缓存及对应问题解决
  • 【穿透科技】P2P穿透模块介绍