当前位置: 首页 > news >正文

Spark教程5-基本结构化操作

加载csv文件

df = spark.read.format("json").load("/data/flight-data/json/2015-summary.json")

Schema

输出Schema

df.printSchema()

使用Schema读取csv文件,以指定数据类型

from pyspark.sql.types import StructField, StructType, StringType, LongTypemySchema = StructType([StructField("DEST_COUNTRY_NAME", StringType(), True),StructField("ORIGIN_COUNTRY_NAME", StringType(), True),StructField("count", LongType(), False)]
)
df = spark.read.format("json").schema(mySchema).load("/Users/yangyong/dev/learn_spark/2015-summary.json")

获取第一行

df.first()

创建行

from pyspark.sql import RowmyRow = Row("Hello", None, 1, False)

创建DataFrames

加载csv文件为DataFrames

df = spark.read.format("json").load("/data/flight-data/json/2015-summary.json")

合并Schema和Rows为DataFrames

Schema1 = StructType([StructField("id", StringType(), True),StructField("name", StringType(), True),StructField("country", StringType(), True)]
)row1 = Row('1', 'Oscar', 'United States')
row2 = Row('2', 'China', 'England')
myDF = spark.createDataFrame([row1, row2], schema=Schema1)
myDF.show()"""
+---+-----+-------------+
| id| name|      country|
+---+-----+-------------+
|  1|Oscar|United States|
|  2|China|      England|
+---+-----+-------------+
"""

两种查询:select和selectExpr

select

from  pyspark.sql.functions import expr, col, columndf.select('dest_country_name').show(2)
df.select('dest_country_name', 'origin_country_name').show(2)
df.select(expr('dest_country_name'), col('dest_country_name'), column('dest_country_name')).show(2)"""
+-----------------+
|dest_country_name|
+-----------------+
|    United States|
|    United States|
+-----------------+
only showing top 2 rows+-----------------+-------------------+
|dest_country_name|origin_country_name|
+-----------------+-------------------+
|    United States|            Romania|
|    United States|            Croatia|
+-----------------+-------------------+
only showing top 2 rows+-----------------+-----------------+-----------------+
|dest_country_name|dest_country_name|dest_country_name|
+-----------------+-----------------+-----------------+
|    United States|    United States|    United States|
|    United States|    United States|    United States|
+-----------------+-----------------+-----------------+
only showing top 2 rows
"""

列重命名

df.select(expr('dest_country_name as destination')).show(2)
df.select(col('dest_country_name').alias('destination')).show(2)"""
+-------------+
|  destination|
+-------------+
|United States|
|United States|
+-------------+
only showing top 2 rows+-------------+
|  destination|
+-------------+
|United States|
|United States|
+-------------+
only showing top 2 rows
"""

selectExpr

列重命名

df.selectExpr('dest_country_name as destination', 'dest_country_name').show(2)"""
+-------------+-----------------+
|  destination|dest_country_name|
+-------------+-----------------+
|United States|    United States|
|United States|    United States|
+-------------+-----------------+
only showing top 2 rows
"""

新增列

df.selectExpr('*', '(dest_country_name = origin_country_name) as withinCountry').show(2)"""
+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows
"""

相当于SQL

SELECT *, (dest_country_name = origin_country_name) as withinCountry 
FROM dfTable limit 2

使用聚合函数

df.selectExpr('avg(count)', 'count(distinct(dest_country_name))').show(2)"""
+-----------+---------------------------------+
| avg(count)|count(DISTINCT dest_country_name)|
+-----------+---------------------------------+
|1770.765625|                              132|
+-----------+---------------------------------+
"""

添加列 withColumn

from pyspark.sql.functions import litdf.withColumn('numberOne', lit(1)).show(2)
df.withColumn('withinCountry', expr('dest_country_name == origin_country_name')).show(2)"""
+-----------------+-------------------+-----+---------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|numberOne|
+-----------------+-------------------+-----+---------+
|    United States|            Romania|   15|        1|
|    United States|            Croatia|    1|        1|
+-----------------+-------------------+-----+---------+
only showing top 2 rows+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows
"""

列重命名 withColumnRenamed

df.withColumnRenamed('dest_country_name', 'dest').show(2)"""
+-------------+-------------------+-----+
|         dest|ORIGIN_COUNTRY_NAME|count|
+-------------+-------------------+-----+
|United States|            Romania|   15|
|United States|            Croatia|    1|
+-------------+-------------------+-----+
only showing top 2 rows
"""

去掉列

df.drop('origin_country_name').show(2)
"""
+-----------------+-----+
|DEST_COUNTRY_NAME|count|
+-----------------+-----+
|    United States|   15|
|    United States|    1|
+-----------------+-----+
only showing top 2 rows
"""

修改列类型

df.withColumn('count2', col('count').cast('long'))

行过滤 filter/where

这两者是等价的

df.filter('count < 2').show(2)
df.where('count < 2').show(2)
df.where(col('count') < 2).show(2)"""
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
+-----------------+-------------------+-----+
only showing top 2 rows+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
+-----------------+-------------------+-----+
only showing top 2 rows+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
+-----------------+-------------------+-----+
only showing top 2 rows
"""

多个条件过滤

df.where('count < 2').where('dest_country_name != "United States"').show(2)"""
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|          Moldova|      United States|    1|
|            Malta|      United States|    1|
+-----------------+-------------------+-----+
only showing top 2 rows
"""

去重

df.select('dest_country_name', 'origin_country_name').distinct().count()"""
equal to SQL:
SELECT COUNT(DISTINCT(dest_country_name, origin_country_name)) FROM dfTable;
"""

合并DataFrames

拥有同样的Schema以及columns才能合并

from pyspark.sql import Row
schema = df.schema
newRows = [Row("New Country", "Other Country", 5),Row("New Country 2", "Other Country 3", 1)
]
newDF = spark.createDataFrame(newRows, schema)# in Python
df.union(newDF)\.where("count = 1")\.where(col("ORIGIN_COUNTRY_NAME") != "United States")\.show()
"""
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
|    United States|          Gibraltar|    1|
|    United States|             Cyprus|    1|
|    United States|            Estonia|    1|
|    United States|          Lithuania|    1|
|    United States|           Bulgaria|    1|
|    United States|            Georgia|    1|
|    United States|            Bahrain|    1|
|    United States|   Papua New Guinea|    1|
|    United States|         Montenegro|    1|
|    United States|            Namibia|    1|
|    New Country 2|    Other Country 3|    1|
+-----------------+-------------------+-----+
"""

行排序 sort/orderBy

两种方式等价


df.sort("count").show(5)
df.orderBy("count", "DEST_COUNTRY_NAME").show(5)
df.orderBy(col("count"), col("DEST_COUNTRY_NAME")).show(5)from pyspark.sql.functions import desc, ascdf.orderBy(expr("count desc")).show(2)
df.orderBy(col("count").desc(), col("DEST_COUNTRY_NAME").asc()).show(2)

Limit

df.limit(5).show()
df.orderBy(expr("count desc")).limit(6).show()
http://www.lryc.cn/news/468935.html

相关文章:

  • 内置数据类型、变量名、字符串、数字及其运算、数字的处理、类型转换
  • Win/Mac/Android/iOS怎麼刪除代理設置?
  • 数据结构------手撕顺序表
  • UDP(用户数据报协议)端口监控
  • 【Java小白图文教程】-05-数组和排序算法详解
  • OpenCV视觉分析之目标跟踪(1)计算密集光流的类DISOpticalFlow的介绍
  • Lucas带你手撕机器学习——套索回归
  • 面试中的一个基本问题:如何在数据库中存储密码?
  • XML HTTP Request
  • TLS协议基本原理与Wireshark分析
  • 当遇到 502 错误(Bad Gateway)怎么办
  • 学习记录:js算法(七十五): 加油站
  • 强心剂!EEMD-MPE-KPCA-LSTM、EEMD-MPE-LSTM、EEMD-PE-LSTM故障识别、诊断
  • yarn的安装与使用以及与npm的区别(安装过程中可能会遇到的问题)
  • 大数据行业预测
  • 可能是NextJs(使用ssr、api route)打包成桌面端(nextron、electron、tauri)的最佳解决方式
  • 二百七十、Kettle——ClickHouse中增量导入清洗数据错误表
  • CentOS6升级OpenSSH9.2和OpenSSL3
  • 2024 年 MathorCup 数学应用挑战赛——大数据竞赛-赛道 A:台风的分类与预测
  • kotlin实现viewpager
  • RabbitMQ最新版本4.0.2在Windows下的安装及使用
  • 东方博宜1180 - 数字出现次数
  • LeetCode: 3274. 检查棋盘方格颜色是否相同
  • datax编译并测试
  • 2-133 基于matlab的粒子群算法PSO优化BP神经网络
  • 复盘秋招22场面试(四)形势重新评估与后续措施
  • 揭开C++ STL的神秘面纱之string:提升编程效率的秘密武器
  • 用人工智能,应该怎么掏钱?
  • 【Axure高保真原型】移动案例
  • Bytebase 3.0.0 - AI 助手全面升级