当前位置: 首页 > news >正文

3、JSON数据的处理

3.1 介绍

JSON数据

  • Spark SQL can automatically infer the schema of a JSON dataset and load it as a DataFrame

    Spark SQL能够自动将JSON数据集以结构化的形式加载为一个DataFrame

  • This conversion can be done using SparkSession.read.json on a JSON file

    读取一个JSON文件可以用SparkSession.read.json方法

从JSON到DataFrame

  • 指定DataFrame的schema

    1,通过反射自动推断,适合静态数据

    2,程序指定,适合程序运行中动态生成的数据

加载json数据

#使用内部的schema
jsonDF = spark.read.json("xxx.json")
jsonDF = spark.read.format('json').load('xxx.json')#指定schema
jsonDF = spark.read.schema(jsonSchema).json('xxx.json')

嵌套结构的JSON

  • 重要的方法

    1,get_json_object

    2,get_json

    3,explode

3.2 实践

3.1 静态json数据的读取和操作

无嵌套结构的json数据

from pyspark.sql import SparkSession
spark =  SparkSession.builder.appName('json_demo').getOrCreate()
sc = spark.sparkContext# ==========================================
#                无嵌套结构的json
# ==========================================
jsonString = [
"""{ "id" : "01001", "city" : "AGAWAM",  "pop" : 15338, "state" : "MA" }""",
"""{ "id" : "01002", "city" : "CUSHMAN", "pop" : 36963, "state" : "MA" }"""
]

从json字符串数组得到DataFrame

# 从json字符串数组得到rdd有两种方法
# 1. 转换为rdd,再从rdd到DataFrame
# 2. 直接利用spark.createDataFrame(),见后面例子jsonRDD = sc.parallelize(jsonString)   # stringJSONRDD
jsonDF =  spark.read.json(jsonRDD)  # convert RDD into DataFrame
jsonDF.printSchema()
jsonDF.show()

直接从文件生成DataFrame

# -- 直接从文件生成DataFrame
#只有被压缩后的json文件内容,才能被spark-sql正确读取,否则格式化后的数据读取会出现问题
jsonDF = spark.read.json("xxx.json")
# or
# jsonDF = spark.read.format('json').load('xxx.json')jsonDF.printSchema()
jsonDF.show()jsonDF.filter(jsonDF.pop>4000).show(10)
#依照已有的DataFrame,创建一个临时的表(相当于mysql数据库中的一个表),这样就可以用纯sql语句进行数据操作
jsonDF.createOrReplaceTempView("tmp_table")resultDF = spark.sql("select * from tmp_table where pop>4000")
resultDF.show(10)

3.2 动态json数据的读取和操作

指定DataFrame的Schema

3.1节中的例子为通过反射自动推断schema,适合静态数据

下面我们来讲解如何进行程序指定schema

没有嵌套结构的json

jsonString = [
"""{ "id" : "01001", "city" : "AGAWAM",  "pop" : 15338, "state" : "MA" }""",
"""{ "id" : "01002", "city" : "CUSHMAN", "pop" : 36963, "state" : "MA" }"""
]jsonRDD = sc.parallelize(jsonString)from pyspark.sql.types import *#定义结构类型
#StructType:schema的整体结构,表示JSON的对象结构
#XXXStype:指的是某一列的数据类型
jsonSchema = StructType() \.add("id", StringType(),True) \.add("city", StringType()) \.add("pop" , LongType()) \.add("state",StringType())jsonSchema = StructType() \.add("id", LongType(),True) \.add("city", StringType()) \.add("pop" , DoubleType()) \.add("state",StringType())reader = spark.read.schema(jsonSchema)jsonDF = reader.json(jsonRDD)
jsonDF.printSchema()
jsonDF.show()

带有嵌套结构的json

from pyspark.sql.types import *
jsonSchema = StructType([StructField("id", StringType(), True),StructField("city", StringType(), True),StructField("loc" , ArrayType(DoubleType())),StructField("pop", LongType(), True),StructField("state", StringType(), True)
])reader = spark.read.schema(jsonSchema)
jsonDF = reader.json('data/nest.json')
jsonDF.printSchema()
jsonDF.show(2)
jsonDF.filter(jsonDF.pop>4000).show(10)
http://www.lryc.cn/news/110908.html

相关文章:

  • 8月5日上课内容 nginx的优化和防盗链
  • 网络爬虫请求头中的Referer和User-Agent与代理IP的配合使用
  • RabbitMQ 生产者-消息丢失 之 场景分析
  • Hyper实现git bash在windows环境下多tab窗口显示
  • Matlab的信号频谱分析——FFT变换
  • 如何从 Android 设备恢复已删除的文件?
  • servlet生命周期和初始化参数传递
  • dvwa靶场通关(十一)
  • 【Spring】使用注解存储Bean对象
  • 怎么维护好自己的电脑
  • vscode中无法使用git解决方案
  • MybatisPlus-CRUD,不带条件构造器的常用方法
  • 软件测试面试【富途面经分享】
  • antd 库的 Table 组件中删除一个或多个选中的列表
  • 针对java程序员的了解细节操作系统与进程
  • 判定是否互为字符重排、回文排列
  • QT QTextCharFormat 说明和使用
  • 掌握Memory Profiler技巧:识别内存问题
  • Linux学习之正则表达式元字符和grep命令
  • 熟练掌握ChatGPT解决复杂问题——学会提问
  • JVM之类加载与字节码
  • 【博客688】如何实现keepalived vip监控与告警
  • [QT编程系列-39]:用户界面UI - 样式表QSS与样式文件快速入门
  • 机器学习和深度学习简述
  • diffusion model2 扩散模型的文本信息融合、交叉注意力机制、lora
  • 数据结构——二叉树
  • 架构训练营学习笔记:5-3接口高可用
  • 【笔记】湖仓一体架构演进与发展
  • 政务云建设与应用解决方案[42页PPT]
  • 20天突破英语四级高频词汇——第①天