当前位置: 首页 > news >正文

低版本hive(1.2.1)UDF实现清除历史分区数据

目标:通过UDF实现对表历史数据清除

入参:表名、保留天数N

一、pom文件

<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>hive-udf-example</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><name>hive-udf-example</name><description>Hive UDF for deleting partitions by date</description><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties><dependencies><!-- Hive Exec (Hive 1.2.1版本) --><dependency><groupId>org.apache.hive</groupId><artifactId>hive-exec</artifactId><version>1.2.1</version></dependency><!-- Hive Metastore (Hive 1.2.1版本) --><dependency><groupId>org.apache.hive</groupId><artifactId>hive-metastore</artifactId><version>1.2.1</version></dependency><!-- Hadoop Client (Hive 1.2.1默认依赖Hadoop 2.7.3) --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.7.3</version></dependency><!-- SLF4J 日志 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.25</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.25</version></dependency><!-- 打包 --><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version><scope>test</scope></dependency></dependencies><build><plugins><!-- 编译插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins></build>
</project>

二、java代码

package org.udf;import org.apache.hadoop.fs.*;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.conf.Configuration;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.*;@Description(name = "del_dt",value = "通过删除HDFS文件并同步元数据的方式删除表N天前的分区 - 入参:表名, N(天数)"
)
public class del_dt extends UDF {/*UDF复用需要修改 dbName(数仓schema) 和 sdf(分区格式)两个参数*/private static final Logger LOG = LoggerFactory.getLogger(del_dt.class);public String evaluate(String tableName, int days) {if (tableName == null || days < 0) {return "错误:表名不能为空且天数不能为负数";}Configuration conf = new Configuration();FileSystem fs = null;HiveMetaStoreClient client = null;int deletedCount = 0;try {// 获取HDFS文件系统fs = FileSystem.get(conf);// 获取Hive元数据客户端HiveConf hiveConf = new HiveConf(conf, this.getClass());client = new HiveMetaStoreClient(hiveConf);// 解析表名(处理db.table格式)                                                    -- 需要修改的变量String dbName = "bjsythzczcpt";String tableOnlyName = tableName;if (tableName.contains(".")) {String[] parts = tableName.split("\\.");dbName = parts[0];tableOnlyName = parts[1];}// 检查表是否存在if (!client.tableExists(dbName, tableOnlyName)) {return "错误:表 " + tableName + " 不存在";}// 获取表信息Table table = client.getTable(dbName, tableOnlyName);// 检查表是否为分区表List<FieldSchema> partitionKeys = table.getPartitionKeys();if (partitionKeys == null || partitionKeys.isEmpty()) {return "错误:表 " + tableName + " 不是分区表";}// 检查是否包含日期分区列(假设为dt)boolean hasDatePartition = false;for (FieldSchema key : partitionKeys) {if (key.getName().equalsIgnoreCase("dt")) {hasDatePartition = true;break;}}if (!hasDatePartition) {return "错误:表 " + tableName + " 不包含日期分区列(dt)";}// 计算N天前的日期Calendar cal = Calendar.getInstance();cal.add(Calendar.DAY_OF_YEAR, -days);Date cutoffDate = cal.getTime();SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");               // 分区格式String cutoffDateStr = sdf.format(cutoffDate);// 获取表的所有分区List<Partition> partitions = client.listPartitions(dbName, tableOnlyName, (short) -1);// 获取表的存储描述符(用于构建分区路径)String tableLocation = table.getSd().getLocation();// 遍历分区并删除N天前的分区for (Partition partition : partitions) {Map<String, String> partitionValues = getPartitionValues(client, partition);String dtValue = partitionValues.get("dt");if (dtValue != null) {try {Date partitionDate = sdf.parse(dtValue);if (partitionDate.before(cutoffDate)) {// 构建分区路径String partitionPath = buildPartitionPath(tableLocation, partition.getValues(), partitionKeys);Path hdfsPath = new Path(partitionPath);// 删除HDFS上的分区数据if (fs.exists(hdfsPath)) {fs.delete(hdfsPath, true);LOG.info("成功删除HDFS分区路径: {}", partitionPath);// 从元数据中删除分区client.dropPartition(dbName, tableOnlyName, partition.getValues(), true);deletedCount++;LOG.info("成功删除分区: {}", partition.getValues());}}} catch (Exception e) {LOG.error("处理分区失败 ({}): {}", partition.getValues(), e.getMessage());}}}return "操作完成:成功删除 " + deletedCount + " 个分区";} catch (IOException | TException e) {LOG.error("执行失败: {}", e.getMessage());return "错误:执行失败 - " + e.getMessage();} finally {// 关闭资源if (fs != null) {try {fs.close();} catch (IOException e) {LOG.error("关闭HDFS连接失败: {}", e.getMessage());}}if (client != null) {client.close();}}}// 解析分区值(修正静态方法问题)private Map<String, String> getPartitionValues(HiveMetaStoreClient client, Partition partition) {Map<String, String> values = new HashMap<>();List<String> partitionVals = partition.getValues();try {// 使用已创建的client实例获取表信息Table table = client.getTable(partition.getDbName(), partition.getTableName());List<FieldSchema> partitionKeys = table.getPartitionKeys();for (int i = 0; i < Math.min(partitionKeys.size(), partitionVals.size()); i++) {values.put(partitionKeys.get(i).getName(), partitionVals.get(i));}} catch (TException e) {LOG.error("获取分区键失败: {}", e.getMessage());}return values;}// 构建分区路径private String buildPartitionPath(String tableLocation, List<String> partitionValues, List<FieldSchema> partitionKeys) {StringBuilder pathBuilder = new StringBuilder(tableLocation);for (int i = 0; i < partitionValues.size(); i++) {if (i < partitionKeys.size()) {pathBuilder.append("/").append(partitionKeys.get(i).getName()).append("=").append(partitionValues.get(i));}}return pathBuilder.toString();}
}

三、函数创建与修改

-- 创建函数
add jar hdfs:///hdfs路径/jar包名.jar;
CREATE  FUNCTION del_dt AS 'org.udf.del_dt';-- 修改函数
DELETE jar hdfs:///hdfs路径/jar包名.jar;
add jar hdfs:///hdfs路径/jar包名.jar;
drop FUNCTION del_dt;
CREATE  FUNCTION del_dt AS 'org.udf.del_dt';

四、调用示例;

-- 删除dwd_abc_df表历史分区数据,保留最近7天分区
hive> SELECT del_dt('dwd_abc_df',7);
-- 结果输出
OK
操作完成:成功删除 0 个分区
Time taken: 0.192 seconds
http://www.lryc.cn/news/583813.html

相关文章:

  • hive小文件问题
  • RabbitMQ 消息队列:从入门到Spring Boot实战
  • MySQL(127)如何解决主从同步失败问题?
  • XMAPP MySQL 启动后自动停止
  • adb 简介与常用命令
  • 线上事故处理记录
  • mx6ull-裸机学习实验15——RTC 实时时钟实验
  • 浪潮CD1000-移动云电脑-RK3528芯片-2+32G-开启ADB ROOT破解教程
  • MySQL断开连接后无法正常启动解决记录
  • 第一次搭建数据库
  • 壁仞 k8s 兼容
  • 力扣hot100速通(7.9)|49.字母异位词分组 128.最长连续序列 283.移动零 11.盛最多水的容器 42.接雨水
  • Swift 图论实战:DFS 算法解锁 LeetCode 323 连通分量个数
  • 力扣面试150题--全排列
  • leetcode 3440. 重新安排会议得到最多空余时间 II 中等
  • Leetcode力扣解题记录--第42题 接雨水(动规和分治法)
  • 图解LeetCode:79递归实现单词搜索
  • 【LeetCode100】--- 1.两数之和【复习回滚】
  • 力扣-73.矩阵置零
  • 力扣-54.螺旋矩阵
  • 每天一个前端小知识 Day 29 - WebGL / WebGPU 数据可视化引擎设计与实践
  • C++11 std::is_sorted 和 std::is_sorted_until 原理解析
  • 邀请函 | 知从科技邀您共赴2025 RISC-V 中国峰会
  • 使用 Qlib 获取股票数据
  • 从零开始的语言模型构建 CS336 第一课(一)
  • 数字孪生系统如何助力汽车零部件企业实现虚拟智控
  • Allegro PCB 手动添加元器件全流程解析
  • Pytest 预期失败测试:如何标记“已知问题”用例
  • HTTP 请求体类型详解:选择最适合的数据提交格式
  • 西部数据WD授权代理商-深圳同袍存储科技有限公司