当前位置: 首页 > news >正文

Hive小文件处理

MR任务

mr任务参考链接

set hive.exec.reducers.max=3

set hive.exec.dynamic.partition.mode = true; --使用动态分区时,设置为ture。 set hive.exec.dynamic.partition.mode = nonstrict; --动态分区模式,默认值:strict,表示必须指定一个分区为静态分区;nonstrict模式表示允许所有的分区字段都可以使用动态分区。一般需要设置为nonstrict。 set hive.exec.max.dynamic.partitions.pernode =10; --在每个执行MR的节点上,最多可以创建多少个动态分区,默认值:100。 set hive.exec.max.dynamic.partitions =1000; --在所有执行MR的节点上,最多一共可以创建多少个动态分区,默认值:1000。 set hive.exec.max.created.files = 100000; --整个MR Job中最多可以创建多少个HDFS文件,默认值:100000。 set hive.error.on.empty.partition = false; --当有空分区产生时,是否抛出异常,默认值:false。 Hive文件产生大量小文件的原因: 一是文件本身的原因:小文件多,以及文件的大小; 二是使用动态分区,可能会导致产生大量分区,从而产生很多小文件,也会导致产生很多Mapper; 三是Reduce数量较多,Hive SQL输出文件的数量和Reduce的个数是一样的。 小文件带来的影响: 文件的数量和大小决定Mapper任务的数量,小文件越多,Mapper任务越多,每一个Mapper都会启动一个JVM来运行,所以这些任务的初始化和执行会花费大量的资源,严重影响性能。 在NameNode中每个文件大约占150字节,小文件多,会严重影响NameNode性能。 解决小文件问题: 如果动态分区数量不可预测,最好不用。如果用,最好使用distributed by分区字段,这样会对字段进行一个hash操作,把相同的分区给同一个Reduce处理; 减少Reduce数量; 进行以一些参数调整。

Hdfs文件数

指定目录下的文件夹,文件,容量大小
[root@mz-hadoop-01 ~]# hdfs dfs -count  /user/hive/warehouse/paascloud_tcm.db/dwd/dwd_t_record_detailed568         7433         6065483664 /user/hive/warehouse/paascloud_tcm.db/dwd/dwd_t_record_detailed[root@mz-hadoop-01 ~]# hdfs dfs -count -h /user/hive/warehouse/paascloud_tcm.db/dwd/dwd_t_record_detailed568        7.3 K              5.6 G /user/hive/warehouse/paascloud_tcm.db/dwd/dwd_t_record_detailed

Hive文件数

SELECT tbl_id,SUM(PARAM_VALUE) AS file_cnts
FROM
(
SELECT * FROM PARTITIONS WHERE tbl_id='97387'
) aLEFT JOIN (SELECT * FROM partition_params WHERE PARAM_KEY='numFiles' ) b
ON a.part_id=b.part_idGROUP BY tbl_id
ORDER BY file_cnts DESC;TBL_ID  file_cnts  
------  -----------97387         2082

所有文件数

SELECT SUM(PARAM_VALUE) AS file_cnts
FROM
(
SELECT * FROM PARTITIONS
) aLEFT JOIN (SELECT * FROM partition_params WHERE PARAM_KEY='numFiles' ) b
ON a.part_id=b.part_idfile_cnts  
-----------340323

表文件数topN

SELECT e.*,f.*
FROM 
(SELECT c.*,d.db_id,d.tbl_name
FROM
(
SELECT tbl_id,SUM(PARAM_VALUE) AS file_cnts
FROM
(
SELECT * FROM PARTITIONS
) aLEFT JOIN (SELECT * FROM partition_params WHERE PARAM_KEY='numFiles' ) b
ON a.part_id=b.part_idGROUP BY tbl_id
ORDER BY file_cnts DESC
) cLEFT JOIN (SELECT * FROM tbls
) d
ON c.tbl_id=d.tbl_id) e LEFT JOIN
(SELECT db_id AS db_id2,`desc`,DB_LOCATION_URI,NAME as db_name,OWNER_NAME,OWNER_TYPE FROM dbs
)f ON e.db_id=f.DB_ID2

库文件数topN

select 
db_id,db_name,DB_LOCATION_URI,sum(file_cnts) as file_cnts
from (SELECT e.*,f.*
FROM 
(SELECT c.*,d.db_id,d.tbl_name
FROM
(
SELECT tbl_id,SUM(PARAM_VALUE) AS file_cnts
FROM
(
SELECT * FROM PARTITIONS
) aLEFT JOIN (SELECT * FROM partition_params WHERE PARAM_KEY='numFiles' ) b
ON a.part_id=b.part_idGROUP BY tbl_id
ORDER BY file_cnts DESC
) cLEFT JOIN (SELECT * FROM tbls
) d
ON c.tbl_id=d.tbl_id) e LEFT JOIN
(SELECT db_id AS db_id2,`desc`,DB_LOCATION_URI,NAME as db_name,OWNER_NAME,OWNER_TYPE FROM dbs
)f ON e.db_id=f.DB_ID2)g group by db_id,db_name,DB_LOCATION_URI order by file_cnts desc

小文件压缩任务

package com.mingzhi.common.universalimport com.mingzhi.common.interf.{IDate, MySaveMode}
import com.mingzhi.common.utils.{HiveUtil, SinkUtil, SparkUtils, TableUtils}
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.storage.StorageLevel/*** 处理只有一个分区dt的表*/
object table_compress_process {private var hive_dbs: String = "paascloud"private var hive_tables: String = "dwd_order_info_abi"private var dt: String = "2023-06-30"private var dt1: String = "2023-06-30"def main(args: Array[String]): Unit = {System.setProperty("HADOOP_USER_NAME", "root")val builder = SparkUtils.getBuilderif (System.getProperties.getProperty("os.name").contains("Windows")) {builder.master("local[*]")} else {hive_dbs = args(0)hive_tables = args(1)dt = args(2)dt1 = args(3)}val spark: SparkSession = builder.appName("clean_process").getOrCreate()HiveUtil.openDynamicPartition(spark)spark.sql("set spark.sql.shuffle.partitions=1")if ("all".equalsIgnoreCase(hive_dbs)) {val builder = new StringBuilder()val frame_db = spark.sql("show databases").select("databaseName")frame_db.show(false)frame_db.collect().foreach(db => {builder.append(db.toString().replace("[", "").replace("]", ","))})println("dbs:" + builder.toString())hive_dbs = builder.toString()}hive_dbs.split(",").foreach(db => {if (StringUtils.isNotBlank(db)) {if ("all".equalsIgnoreCase(hive_tables)) {compress_all_table(spark, db)} else {hive_tables.split(",").foreach(t => {compress_the_table(spark, db, t)})}}})spark.stop()}private def compress_the_table(spark: SparkSession, hive_db: String, table: String): Unit = {println("compress_the_table======>:" + hive_db + "." + table)spark.sql(s"use $hive_db")if (TableUtils.tableExists(spark, hive_db, table)) {try {new IDate {override def onDate(dt: String): Unit = {/*** 建议:对需要checkpoint的RDD,先执行persist(StorageLevel.DISK_ONLY)*/val f1 = spark.sql(s"""||select * from $hive_db.$table where dt='$dt'|""".stripMargin).persist(StorageLevel.MEMORY_ONLY)val r_ck: (DataFrame, String) = SparkUtils.persistDataFrame(spark, f1)val f2 = r_ck._1println("f2 show===>")f2.show(false)val type_ = TableUtils.getCompressType(spark, hive_db, table)if ("HiveFileFormat".equalsIgnoreCase(type_)) {println("sink HiveFileFormat table:" + table)SinkUtil.sink_to_hive_HiveFileFormat(spark, f2, hive_db, table, null)} else {//spark表SinkUtil.sink_to_hive(dt, spark, f2, hive_db, table, type_, MySaveMode.OverWriteByDt, 1)}spark.sql(s"drop table ${r_ck._2} ")}}.invoke(dt, dt1)} catch {case e: org.apache.spark.sql.AnalysisException => {println("exception1:" + e)}case e: Exception => println("exception:" + e)}}}private def compress_all_table(spark: SparkSession, hive_db: String): Unit = {spark.sql(s"use $hive_db")val frame_table = spark.sql(s"show tables")frame_table.show(100, false)frame_table.printSchema()frame_table.filter(r => {!r.getAs[Boolean]("isTemporary")}).select("tableName").collect().foreach(r => {//r:[ads_order_topn]val table = r.toString().replace("[", "").replace("]", "")println("compress table:" + hive_db + "." + table)if (TableUtils.tableExists(spark, hive_db, table)) {try {new IDate {override def onDate(dt: String): Unit = {val f1 = spark.sql(s"""||select * from $hive_db.$table where dt='$dt'|""".stripMargin)SinkUtil.sink_to_hive(dt, spark, f1, hive_db, table, "orc", MySaveMode.OverWriteByDt, 1)}}.invoke(dt, dt1)} catch {case e: org.apache.spark.sql.AnalysisException => {println("exception1:" + e)}case e: Exception => println("exception:" + e)}}})}
}
http://www.lryc.cn/news/240746.html

相关文章:

  • go语言学习之旅之Go语言函数
  • mysql的联合索引最左匹配原则问题
  • 三层交换机实现不同VLAN间通讯
  • C#枚举的使用
  • .Net6使用WebSocket与前端进行通信
  • hadoop 编写开启关闭集群脚本, hadoop hdfs,yarn开启关闭脚本。傻瓜式hadoop脚本 hadoop(九)
  • ArrayList中放的是一个对象,如何同时根据对象中的三个字段对List进行排序
  • MONGODB 的基础 NOSQL注入基础
  • 单链表实现【队列】
  • 随机微分方程的MATLAB数值求解
  • ChatGPT 也并非万能,品牌如何搭上 AIGC「快班车」
  • 【JavaSE】不允许你不会使用String类
  • 身份证阅读器和社保卡读卡器Harmony鸿蒙系统ArkTS语言SDK开发包
  • 并发与并行
  • 搭个网页应用,让ChatGPT帮我写SQL
  • 实时云渲染 助力破解智慧园区痛点困局
  • 计算机组成原理2
  • Py之PyMuPDF:PyMuPDF的简介、安装、使用方法之详细攻略
  • 2023亚太杯数学建模A题B题C题思路模型代码论文指导
  • 【C/PTA】函数专项练习(四)
  • 广西柳州机械异形零部件三维扫描3D抄数全尺寸测绘建模-CASAIM中科广电
  • (四)C语言之符号常量概述
  • springboot -sse -flux 服务器推送消息
  • js进阶笔记之原型,原型链
  • 【DevOps】Git 图文详解(四):Git 使用入门
  • Jquery ajax 同步阻塞引起的UI线程阻塞的坑(loading图片显示不出来 )
  • 读书笔记——《黑猩猩的政治》
  • 此处不允许使用特性namespace
  • 随笔记录-springmvc_ResourceHandlerRegistry+ResourceHttpRequestHandler
  • Redis面试内容,Redis过期策略,Redis持久化方式,缓存穿透、缓存击穿和缓存雪崩,以及解决办法