当前位置: 首页 > news >正文

Pyspark中catalog的作用与常用方法

文章目录

  • Pyspark catalog用法
    • catalog 介绍
    • cache 缓存表
    • uncache 清除缓存表
    • cleanCache 清理所有缓存表
    • createExternalTable 创建外部表
    • currentDatabase 返回当前默认库
    • tableExists 检查数据表是否存在,包含临时视图
    • databaseExists 检查数据库是否存在
    • dropGlobalTempView 删除全局临时视图
    • dropTempView 删除临时视图
    • functionExists 检查函数是否存在
    • getDatabase 获取具有指定名称的数据库
    • getFunction 获取方法
    • getTable 获取数据表
    • isCached 检查是否缓存成功
    • listCatalogs 列出可用的catalogs
    • listColumns 返回数据表的列信息
    • listDatabases 获取数据库列表
    • listTables 获取数据表,包含临时视图
    • setCurrentDatabase 设置当前数据库
    • refreshTable 刷新缓存
    • refreshByPath 刷新路径
    • recoverPartitions 恢复分区

Pyspark catalog用法

catalog 介绍

Catalog是Spark中用于管理元数据信息的接口,这些元数据可能包括库、内部或外部表、函数、表列及临时视图等。

总的来说,PySpark Catalogs是PySpark框架中用于管理和查询元数据的重要组件,它使得Python用户能够更有效地利用PySpark进行大数据处理和分析。

spark = SparkSession.builder.appName('LDSX_TEST') \.config('hive.metastore.uris', 'thrift://hadoop01:9083') \.config('spark.master',"local[2]" ) \.enableHiveSupport().getOrCreate()

cache 缓存表

可以设置缓存等级,默认缓存等级为MEMORY_AND_DISK,是数据表级别的缓存,跟缓存dataframe存在区别,

设置不存在的表报错

# 缓存数据表
spark.catalog.cacheTable('ldsx_test.ldsx_table_one')
#检查是否缓存成功
ldsx = spark.catalog.isCached('ldsx_test.ldsx_table_one')
>True

uncache 清除缓存表

当表不存在数据库会报错

spark.catalog.uncacheTable("ldsx_test.ldsx_table_one")

cleanCache 清理所有缓存表

spark.catalog.clearCache()

createExternalTable 创建外部表

# spark.catalog.createExternalTable(#     tableName='ldsx_test_table',#     path = './ldsx_one.csv',#     database='ldsx_test',## )

currentDatabase 返回当前默认库

返回当前默认所在数据库spark.catalog.setCurrentDatabase 设置所在数据库

data = spark.catalog.currentDatabase()

tableExists 检查数据表是否存在,包含临时视图

data = spark.catalog.tableExists('ldsx_test.ldsx_table_one')
>True

databaseExists 检查数据库是否存在

data = spark.catalog.databaseExists('ldsx_test')

dropGlobalTempView 删除全局临时视图

全局临时表查找时候需要指向global_temp

要删除的表不存在报错

#创建全局临时表
spark.createDataFrame([(1, 1)]).createGlobalTempView("my_table")
#注意查询时候需要指向 global_temp
spark.sql('select * from global_temp.my_table').show()
#删除全局临时
ldsx= spark.catalog.dropGlobalTempView("my_table")

dropTempView 删除临时视图

要删除的表不存在报错

#创建临时表
spark.createDataFrame([(1, 1)]).createTempView("my_table")
spark.sql('select * from my_table').show()
#删除临时表
ldsx = spark.catalog.dropTempView("my_table")

functionExists 检查函数是否存在

spark.catalog.functionExists("count")
>True

getDatabase 获取具有指定名称的数据库

data = spark.catalog.getDatabase("ldsx_test")
print(data)
>>Database(name='ldsx_test', catalog='spark_catalog', description='', locationUri='hdfs://master:7171/home/ldsx/opt/hadoopData/hive_data/ldsx_test.db')

getFunction 获取方法

获取不到方法报错

spark.sql("CREATE FUNCTION my_func1 AS 'test.org.apache.spark.sql.MyDoubleAvg'")
data = spark.catalog.getFunction("my_func1")
print(data)
>>Function(name='my_func1', catalog='spark_catalog', namespace=['default'], description='N/A.', className='test.org.apache.spark.sql.MyDoubleAvg', isTemporary=False)

getTable 获取数据表

获取不到表报错

data = spark.catalog.getTable("ldsx_table_one")
print(data)
>>Table(name='ldsx_table_one', catalog='spark_catalog', namespace=['ldsx_test'], description=None, tableType='MANAGED', isTemporary=False)

isCached 检查是否缓存成功

# 缓存数据表
spark.catalog.cacheTable('ldsx_test.ldsx_table_one')
data = spark.catalog.isCached('ldsx_test.ldsx_table_one')
>True

listCatalogs 列出可用的catalogs

catalogs =  spark.catalog.listCatalogs()
print(catalogs)

listColumns 返回数据表的列信息

# 参数:数据表,数据库
catalogs =  spark.catalog.listColumns('ldsx_table_one','ldsx_test')
print(catalogs)
>>    [Column(name='age', description='??', dataType='string', nullable=True, isPartition=False, isBucket=False),Column(name='name', description='??', dataType='string', nullable=True, isPartition=False, isBucket=False),Column(name='fraction', description='??', dataType='string', nullable=True, isPartition=False, isBucket=False),Column(name='class', description='??', dataType='string', nullable=True, isPartition=False, isBucket=False),Column(name='gender', description='??', dataType='string', nullable=True, isPartition=False, isBucket=False)]

listDatabases 获取数据库列表

data1 = spark.catalog.listDatabases()
print(data1)
>>[Database(name='default', catalog='spark_catalog', description='Default Hive database',locationUri='hdfs://master:7171/home/ldsx/opt/hadoopData/hive_data'),
Database(name='ldsx_test', catalog='spark_catalog', description='',locationUri='hdfs://master:7171/home/ldsx/opt/hadoopData/hive_data/ldsx_test.db')]

listTables 获取数据表,包含临时视图

# 展示数据库中数据表以及临时视图
spark.catalog.setCurrentDatabase('ldsx_test')
spark.createDataFrame([(1,1)]).createTempView('TEST')
data = spark.catalog.listTables()
print(data)
>>[Table(name='ldsx_table_one', catalog='spark_catalog', namespace=['ldsx_test'], description=None,tableType='MANAGED', isTemporary=False),Table(name='TEST', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

setCurrentDatabase 设置当前数据库

spark.catalog.setCurrentDatabase('ldsx_test')
data = spark.catalog.currentDatabase()
print(data)
>> ldsx_test

refreshTable 刷新缓存

看官网案例是,刷新已经缓存的表
当一个表执行了cacheTable后,元数据有变动使用refreshTable进行元数据刷新

refreshByPath 刷新路径

# 假设有一个 Hive 表,其数据存储在 HDFS 上的某个路径
path = "/user/hive/warehouse/mydb.db/mytable"
# 刷新该路径下的表或分区信息
spark.catalog.refreshByPath(path)
df = spark.sql("SELECT * FROM mydb.mytable")
df.show()

recoverPartitions 恢复分区

recoverPartitions尝试恢复 Hive 表中丢失的分区信息,实际使用后更新
http://www.lryc.cn/news/432072.html

相关文章:

  • 聚焦2024数博会|与天空卫士一起探索AI与数据安全的融合应用
  • 实战docker第二天——cuda11.8,pytorch基础环境docker打包
  • 企业数字化转型的利器:RFID资产管理系统
  • matplotlib中文乱码问题
  • 提高开发效率的实用工具库VueUse
  • 【数据结构】你真的学会了二叉树了吗,来做一做二叉树的算法题及选择题
  • 压力测试知识总结
  • @import导入样式以及scss变量应用与static目录
  • 分类中的语义一致性约束:助力模型优化
  • 前端框架介绍
  • java基础知识-JVM知识详解
  • 流动会场:以声学专利为核心的完美移动场地—轻空间
  • 深度学习(一)-感知机+神经网络+激活函数
  • 目标检测-YOLOv4
  • 一台笔记本电脑的硬件都有哪些以及对应的功能
  • 【程序分享1】第一性原理计算 + 数据处理程序
  • 【数据结构】栈与队列OJ题(用队列实现栈)(用栈实现队列)
  • element-ui打包之后图标不显示,woff、ttf加载404
  • 探究零工市场小程序如何改变传统兼职模式
  • MySQL数据库安装(详细)—>Mariadb的安装(day21)
  • 微信小程序实践案例
  • DataLoader使用
  • CSS学习11--版心和布局流程以及几种分布的例子
  • NetSuite AI 图生代码
  • Java - BigDecimal计算中位数
  • Tensorflow2如何读取自制数据集并训练模型?-- Tensorflow自学笔记13
  • JVM系列(七) -对象的内存分配流程
  • Apache Ignite 在处理大规模数据时有哪些优势和局限性?
  • 怎么利用NodeJS发送视频短信
  • WebAPI(三)、 DOM 日期对象Date;获取事件戳;根据节点关系查找节点