当前位置: 首页 > news >正文

kv类型算子使用

对kv类型的RDD数据集进行操作。

keys

"""
获取所有的key转换算子"""inputRdd = sc.parallelize([('laoda', 11), ('laoer', 22), ('laosan', 33), ('laosi', 44)])
print(inputRdd.keys().collect())
# ['laoda', 'laoer', 'laosan', 'laosi']

values

"""
获取所有的value转换算子"""inputRdd = sc.parallelize([('laoda', 11), ('laoer', 22), ('laosan', 33), ('laosi', 44)])
print(inputRdd.values().collect())
# [11, 22, 33, 44]

mapValues

"""
拿到所有的value值 对value进行改变 返回值 仍是以前的map转换算子
"""
inputRdd = sc.parallelize([('laoda', 11), ('laoer', 22), ('laosan', 33), ('laosi', 44)])
print(inputRdd.mapValues(lambda values: values + 1).collect())# [('laoda', 12), ('laoer', 23), ('laosan', 34), ('laosi', 45)]

collectAsMap

"""
将二元组类型的RDD转换成一个Dict字典
必须是二元组 从表中查询后的结果需要先转为rdd ,再使用map将其转为二元组
触发算子
"""inputRdd = sc.parallelize([('laoda', 11), ('laoer', 22), ('laosan', 33), ('laosi', 44)])print(inputRdd.collectAsMap())# [('laoda', 11), ('laoer', 22), ('laosan', 33), ('laosi', 44)]
# {'laoda': 11, 'laoer': 22, 'laosan': 33, 'laosi': 44}dimMap = spark.sql("""
select * from dim.area_geo
""").rdd.map(lambda row:(row.geohash5,row.province+"-"+row.city+"-"+row.street)).collectAsMap()
print(dimMap)

join

join也可以视为kv类型的算子,因为是通过key值进行join操作的

rdd_singer_age = sc.parallelize([("周杰伦", 43), ("陈奕迅", 47), ("蔡依林", 41), ("林子祥", 74), ("陈升", 63)])rdd_singer_music = sc.parallelize([("周杰伦", "青花瓷"), ("陈奕迅", "孤勇者"), ("蔡依林", "日不落"), ("林子祥", "男儿当自强"),("动力火车", "当")])# leftOuterJoin 左为主 否则为None 外连接
print(rdd_singer_age.leftOuterJoin(rdd_singer_music).collectAsMap())
# join 内连接
print(rdd_singer_age.join(rdd_singer_music).collectAsMap())
# fullOuterJoin 全外连接 连接不上为None
print(rdd_singer_age.fullOuterJoin(rdd_singer_music).collectAsMap()){'蔡依林': (41, '日不落'), '陈升': (63, None), '陈奕迅': (47, '孤勇者'), '林子祥': (74, '男儿当自强'), '周杰伦': (43, '青花瓷')}{'蔡依林': (41, '日不落'), '陈奕迅': (47, '孤勇者'), '林子祥': (74, '男儿当自强'), '周杰伦': (43, '青花瓷')}{'动力火车': (None, '当'), '蔡依林': (41, '日不落'), '陈升': (63, None), '陈奕迅': (47, '孤勇者'), '林子祥': (74, '男儿当自强'), '周杰伦': (43, '青花瓷')}

http://www.lryc.cn/news/501152.html

相关文章:

  • 3维建模blender
  • 百问FB网络编程 - UDP编程简单示例
  • 面试题:什么是ThreadLocal,如何实现的?
  • js后端开发之Next.js、Nuxt.js 与 Express.js
  • 飞牛Nas如何实现阿里云盘、百度网盘的资料迁移!
  • 如何在小米平板5上运行 deepin 23 ?
  • 【PlantUML系列】流程图(四)
  • 操作系统:进程、线程与作业
  • 先验地图--slam学习笔记
  • 空指针异常:软件开发中的隐形陷阱
  • 【Java从入门到放弃 之 GC】
  • 【C++】等差数列末项计算题解析及优化
  • vue中父组件接收子组件的多个参数的方法:$emit或事件总线
  • 2024.12.10——攻防世界Web_php_include
  • 【机器学习算法】——数据可视化
  • 如何在 Android 项目中实现跨库传值
  • JavaCV之FFmpegFrameFilter视频转灰度
  • Redis:基于PubSub(发布/订阅)、Stream流实现消息队列
  • C#飞行棋(新手简洁版)
  • 【OpenCV】图像转换
  • 力扣 重排链表-143
  • 【Kubernetes理论篇】容器集群管理系统Kubernetes(K8S)
  • Kubernetes 常用操作大全:全面掌握 K8s 基础与进阶命令
  • 爬虫基础之Web网页基础
  • k8s, deployment
  • 使用ensp搭建OSPF+BGP和静态路由,底层PC使用dhcp,实现PC互通
  • TÜLU 3: Pushing Frontiers in Open Language Model Post-Training
  • 深入解读 MySQL EXPLAIN 与索引优化实践
  • Flume——进阶(agent特性+三种结构:串联,多路复用,聚合)
  • ragflow连ollama时出现的Bug