当前位置: 首页 > news >正文

sparkSQL的使用

sparksql只能处理结构化数据

基于rdd构建dataframe对象

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerTypeif __name__ == '__main__':spark = SparkSession.builder.appName('test').master('local[*]').getOrCreate()sc = spark.sparkContext#基于rdd建表rdd = sc.textFile('../data/input/word.txt').map(lambda x:x.split(' ')).\map(lambda x:(x[0],int(x[1])))### 使用rdd构建dataframedf = spark.createDataFrame(rdd,schema=['name','age'])df.printSchema()df.show(20,False)df.createOrReplaceTempView('people')spark.sql('select * from people where age<20').show()#使用toDF的方式构建dataframedf = rdd.toDF(['name','age'])df.printSchema()df.show()# 使用structType的方式构建dataframeschema = StructType.add('name',StringType(),nullable=True).add('age',IntegerType(),nullable=False)df2 = rdd.toDF(schema=schema)df2.printSchema()df2.show()

sparksql读取本地文件

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd
if __name__ == '__main__':spark = SparkSession.builder.appName('test').master('local[*]').config('spark.sql.shuffle.partitions',2).getOrCreate()sc = spark.sparkContextschema = StructType().add('data',StringType(),nullable=True)# 读取txt文件df = spark.read.format('text').schema(schema=schema).load('../data/input/word.txt')# 读取csv文件df = spark.read.format('csv').option('sep',',').option('header',True).load('../data/input/word.txt')df.printSchema()df.show()

数据的写出(text,csv,json,parqeut)

在这里插入图片描述

DSL函数用法汇总

agg,是GroupData对象的api,作用是可以写多个聚合
alias,是column对象的api,可以针对一个列,进行改名
withColumnRenamed,是DataFrame的api,可以对DF的列进行改名,一次改一列,改多列可用链式调用
orderBy,DataFrame的api,进行排序参数1是要排序的字段,参数2是升序降序
first,DataFrame的api,去除DF的第一行数据,返回值是一个Row对象.是一个数组类型,用row[‘列名’]来取出当前值
df.dropDuplicates.show() 去重函数,无参数时是对所有列去重,可以加入参数对指定列去重dropDuplicates([‘age’])
df.dropna().show() 删除空值,dropna(thresh=2,subset=[‘name’,‘age’])这两列如果数据没有两列就会被删除
df.fillna().show() 把空值填充fillna({‘name’:‘未知姓名’,‘age’:1}),把名字那栏空值填为未知姓名,age为1

spark.sql.shuffle.partitions参数的设置

    spark = SparkSession.builder.appName('test').master('local[*]').config('spark.sql.shuffle.partitions',2).getOrCreate()

设置参数的意义是提高local模式下的性能,yarn集群下要根据cpu核数的2~4倍进行设置.

http://www.lryc.cn/news/92236.html

相关文章:

  • gitignore的语法
  • 长袋除尘器
  • HTML+CSS实训——Day02——仿一个网易云音乐的登陆界面
  • css蓝桥杯--电影院排座位
  • c++学习——多态
  • Java SPI机制及原理详解
  • 不压缩打包layui
  • 过去、现在及未来
  • leetcode701. 二叉搜索树中的插入操作(java)
  • Docker的容器管理操作
  • 计算机组成原理——中央处理器
  • tidb变更大小写敏感问题的总结
  • 法规标准-UN R158标准解读
  • 160个CrackMe之002
  • 3. 响应状态码及Response对象的status_code属性
  • MIME 类型列表 03
  • SpringBoot项目登录并接入MFA二次认证
  • 算法与数据结构(三)
  • 亚马逊云科技出海日,让数字经济出海扩展到更多行业和领域
  • Pb协议的接口测试
  • 2. 分布式文件系统 HDFS
  • 借助金融科技差异化发展,不一样的“破茧”手法
  • typescript中type、interface的区别
  • Ingress详解
  • 【递归算法的Java实现及其应用】
  • 2023年度第四届全国大学生算法设计与编程挑战赛(春季赛)
  • 如何用PHP获取各大电商平台的数据
  • 一站式完成车牌识别任务:从模型优化到端侧部署
  • Linux4.8Nginx Rewrite
  • DHT11温湿度传感器