当前位置: 首页 > news >正文

hbase 工具类

hbase 工具类

pom.xml

<dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>2.5.10-hadoop3</version>
</dependency>
<dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>33.2.1-jre</version>
</dependency>

RowKey注解

package cn.lhz.util.annotation;import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.lang.annotation.ElementType;/*** @author 李昊哲* @version 1.0.0*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface RowKeyAnnotation {
}

hbase 自定义 过滤器

package cn.lhz.util.hbase;import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.Cell;/*** hbase 自定义 过滤器** @author 李昊哲* @version 1.0.0*/
public class CustomFilter extends FilterBase {private final String targetValue;public CustomFilter(String targetValue) {this.targetValue = targetValue;}@Overridepublic ReturnCode filterCell(Cell cell) {// 在这里实现过滤逻辑// 比如,如果单元格的值等于 targetValue,则允许它通过byte[] value = cell.getValueArray();String cellValue = new String(value, cell.getValueOffset(), cell.getValueLength());if (cellValue.equals(targetValue)) {// 允许通过return ReturnCode.INCLUDE;} else {// 跳过此单元格return ReturnCode.SKIP;}}
}

hbase 工具类

package cn.lhz.util.hbase;import cn.lhz.util.annotation.RowKeyAnnotation;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.util.Bytes;import java.io.IOException;
import java.lang.reflect.Field;
import java.util.*;
import java.util.regex.Pattern;/*** @author 李昊哲* @version 1.0.0*/
public class HbaseUtil {// 创建 hbase 配置 对象private final static Configuration conf = HBaseConfiguration.create();/*** 获取 Hbase 连接** @return Hbase 连接* @throws IOException IOException*/public static Connection getConnection() throws IOException {return ConnectionFactory.createConnection(conf);}/*** 获取 org.apache.hadoop.hbase.client.Admin 对象** @return org.apache.hadoop.hbase.client.Admin 对象* @throws IOException IOException*/public static Admin getAdmin() throws IOException {return getConnection().getAdmin();}/*** 获取 org.apache.hadoop.hbase.client.Admin 对象** @param connection Connection* @return org.apache.hadoop.hbase.client.Admin 对象* @throws IOException IOException*/public static Admin getAdmin(Connection connection) throws IOException {return connection.getAdmin();}/*** 断开 hbase 连接** @param connection Connection* @throws IOException IOException*/public static void closeConnection(Connection connection) throws IOException {if (connection != null) {connection.close();}}/*** 释放 org.apache.hadoop.hbase.client.Admin 对象** @param admin org.apache.hadoop.hbase.client.Admin 对象* @throws IOException IOException*/public static void closeAdmin(Admin admin) throws IOException {if (admin != null) {admin.close();}}/*** 释放集群管理对象和连接** @param admin      集群管理对象* @param connection 客户端与集群建立的连接* @throws IOException IOException*/public static void closeAdminAndConnection(Admin admin, Connection connection) throws IOException {closeAdmin(admin);closeConnection(connection);}/*** 获取 数据表** @param tableName 数据表名称* @return 数据表* @throws IOException IOException*/public static Table getTable(String tableName) throws IOException {return getConnection().getTable(TableName.valueOf(tableName));}/*** 获取 数据表** @param connection 客户端与集群建立的连接* @param tableName  数据表名称* @return 数据表* @throws IOException IOException*/public static Table getTable(Connection connection, String tableName) throws IOException {return connection.getTable(TableName.valueOf(tableName));}/*** 创建数据表** @param admin             集群管理对象* @param name              数据表名称* @param columnFamilyNames 一个或多个ColumnFamilyName* @throws IOException IOException*/public static void createTable(Admin admin, String name, String... columnFamilyNames) throws IOException {// 4.1、定义表名称TableName tableName = TableName.valueOf(name);// 4.2、构建表描述构造器TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(tableName);// 4.3、构建列簇描述构造器Collection<ColumnFamilyDescriptor> collection = new ArrayList<>();if (columnFamilyNames != null) {for (String columnFamilyName : columnFamilyNames) {// 4.4、构建列簇描述collection.add(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(columnFamilyName)).build());}}// 4.5、表结构构造器与列簇构造器建立关联tableDescriptorBuilder.setColumnFamilies(collection);// 4.6、定义描述对象TableDescriptor tableDescriptor = tableDescriptorBuilder.build();// 4.7、创建表admin.createTable(tableDescriptor);}/*** 删除数据表** @param admin 数据库管理对象* @param name  数据表名称* @return 是否删除* @throws IOException IOException*/public static boolean deleteTable(Admin admin, String name) throws IOException {// 定义表名称TableName tableName = TableName.valueOf(name);// 删除表 分两步 先禁用再删除if (admin.tableExists(tableName)) {// 禁用表admin.disableTable(tableName);// 删除表admin.deleteTable(tableName);}return true;}/*** 新增或更新数据** @param columnFamilyName columnFamilyName* @param e                数据表对应 javabean 对象* @param <E>              数据表对应 javabean 数据类型* @throws IOException            IOException* @throws IllegalAccessException IllegalAccessException*/public static <E> void upsert(Table table, String columnFamilyName, E e) throws IOException, IllegalAccessException {// 获取对象的 Class 对象Class<?> aClass = e.getClass();Put put = null;for (Field field : aClass.getDeclaredFields()) {// 设置允许访问私有属性field.setAccessible(true);// 获取属性 RowKeyAnnotation 注解RowKeyAnnotation annotation = field.getAnnotation(RowKeyAnnotation.class);if (annotation != null) {// 根据 RowKey 构建 Put 对象put = new Put(Bytes.toBytes(String.valueOf(field.get(e))));} else {if (put == null) {return;}// 添加 字段 与 字段对应的值put.addColumn(Bytes.toBytes(columnFamilyName), Bytes.toBytes(field.getName()), Bytes.toBytes(String.valueOf(field.get(e))));}}if (put != null) {// 插入数据table.put(put);}}/*** 新增或更新数据** @param connection       客户端与集群建立的连接* @param name             数据表名称* @param columnFamilyName columnFamilyName* @param e                数据表对应 javabean 对象* @param <E>              数据表对应 javabean 数据类型* @return upsert 是否成功* @throws IOException            IOException* @throws IllegalAccessException IllegalAccessException*/public static <E> boolean upsert(Connection connection, String name, String columnFamilyName, E e) throws IOException, IllegalAccessException {// 定义表名称TableName tableName = TableName.valueOf(name);// 连接 person 表Table table = connection.getTable(tableName);// 获取对象的 Class 对象Class<?> aClass = e.getClass();Put put = null;for (Field field : aClass.getDeclaredFields()) {// 设置允许访问私有属性field.setAccessible(true);// 获取属性 RowKeyAnnotation 注解RowKeyAnnotation annotation = field.getAnnotation(RowKeyAnnotation.class);if (annotation != null) {// 根据 RowKey 构建 Put 对象put = new Put(Bytes.toBytes(String.valueOf(field.get(e))));} else {if (put == null) {return false;}// 添加 字段 与 字段对应的值put.addColumn(Bytes.toBytes(columnFamilyName), Bytes.toBytes(field.getName()), Bytes.toBytes(String.valueOf(field.get(e))));}}if (put != null) {// 插入数据table.put(put);return true;}return false;}/*** 根据 RowKey 删除数据** @param connection 客户端与集群建立的连接* @param name       数据表名称* @param rowKey     数据表对应 RowKey* @return upsert 是否成功* @throws IOException IOException*/public static boolean delete(Connection connection, String name, String rowKey) throws IOException {// 定义表名称TableName tableName = TableName.valueOf(name);// 连接 数据表 表Table table = connection.getTable(tableName);// 根据 RowKey 构建 Delete 对象Delete delete = new Delete(Bytes.toBytes(rowKey));// 执行请求table.delete(delete);return true;}/*** 获取数据表中 ColumnFamily** @param admin 数据库管理对象* @param name  数据表名称* @return ColumnFamily 名称集合* @throws IOException IOException*/public static Set<String> columnFamilyNames(Admin admin, String name) throws IOException {// 指定表名TableName tableName = TableName.valueOf(name);// 获取表的描述对象TableDescriptor tableDescriptor = admin.getDescriptor(tableName);Set<String> columnFamilyNames = new HashSet<>();for (byte[] columnFamilyName : tableDescriptor.getColumnFamilyNames()) {columnFamilyNames.add(Bytes.toString(columnFamilyName));}return columnFamilyNames;}/*** 获取数据表中 column** @param connection 客户端与集群建立的连接* @param name       数据表名称* @return 字段名名称集合* @throws IOException IOException*/public static List<String> columnNames(Connection connection, String name) throws IOException {List<String> columnNames = new ArrayList<>();// 指定表名TableName tableName = TableName.valueOf(name);// 连接 数据表 表Table table = connection.getTable(tableName);// 获取表的描述对象Admin admin = connection.getAdmin();TableDescriptor tableDescriptor = admin.getDescriptor(tableName);// 获取列簇ColumnFamilyDescriptor[] columnFamilies = tableDescriptor.getColumnFamilies();for (ColumnFamilyDescriptor columnFamilyDescriptor : columnFamilies) {Result result = table.getScanner(columnFamilyDescriptor.getName()).next();for (Cell cell : result.rawCells()) {byte[] column = CellUtil.cloneQualifier(cell);columnNames.add(Bytes.toString(column));// String columnName = Bytes.toString(column);// String columnValue = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());}}return columnNames;}/*** 使用RowKey获取查询数据表中RowKey对应的记录** @param connection 客户端与集群建立的连接* @param name       数据表名称* @param rowKey     数据表RowKey* @param clazz      返回对象数据类型* @return 数据表模型类对象* @throws IOException IOException*/public static <T> Class<T> selectByRowKey(Connection connection, String name, String rowKey, Class<T> clazz) throws IOException, NoSuchFieldException, IllegalAccessException {Admin admin = connection.getAdmin();// 定义表名称TableName tableName = TableName.valueOf(name);// 连接 person 表Table table = connection.getTable(tableName);// 根据 RowKey 构建 Get 对象Get get = new Get(Bytes.toBytes(rowKey));// 执行请求 获取结果Result result = table.get(get);// 获取表的描述对象TableDescriptor tableDescriptor = admin.getDescriptor(tableName);// 获取列簇ColumnFamilyDescriptor[] columnFamilies = tableDescriptor.getColumnFamilies();for (ColumnFamilyDescriptor columnFamilyDescriptor : columnFamilies) {List<String> columnNames = new ArrayList<>();byte[] columnFamilyByteName = columnFamilyDescriptor.getName();Result rs = table.getScanner(columnFamilyDescriptor.getName()).next();for (Cell cell : rs.rawCells()) {byte[] column = CellUtil.cloneQualifier(cell);columnNames.add(Bytes.toString(column));}for (String columnName : columnNames) {// 解析结果String value = Bytes.toString(result.getValue(columnFamilyByteName, Bytes.toBytes(columnName)));Field field = clazz.getDeclaredField(nameInDb2nameInJava(columnName));field.setAccessible(true);field.set(clazz, value);}}return clazz;}/*** @param connection       客户端与集群建立的连接* @param name             数据表名称* @param columnFamilyName ColumnFamily 名称* @param columnName       字段名称* @param columnValue      字段匹配的值* @param clazz            数据表对应的模型类* @param cacheCount       缓存数量* @param <T>              返回值泛型* @return 查询结果* @throws IOException            IOException* @throws NoSuchFieldException   NoSuchFieldException* @throws IllegalAccessException IllegalAccessException*/public static <T> List<Class<T>> filter(Connection connection, String name, String columnFamilyName, String columnName, String columnValue, Class<T> clazz, int cacheCount) throws IOException, NoSuchFieldException, IllegalAccessException {List<Class<T>> list = new ArrayList<>();Table table = connection.getTable(TableName.valueOf(name));// 创建扫描对象Scan scan = new Scan();// 设置缓存scan.setCaching(cacheCount);// 创建自定义过滤器Filter customFilter = new CustomFilter(columnValue);scan.setFilter(customFilter);// 进行扫描try (ResultScanner scanner = table.getScanner(scan)) {for (Result result : scanner) {for (Field field : clazz.getDeclaredFields()) {field.setAccessible(true);field.set(clazz, null);}List<String> columnNames = new ArrayList<>();Result rs = table.getScanner(Bytes.toBytes(columnFamilyName)).next();for (Cell cell : rs.rawCells()) {byte[] column = CellUtil.cloneQualifier(cell);columnNames.add(Bytes.toString(column));}for (String columnTempName : columnNames) {// 解析结果String value = Bytes.toString(result.getValue(Bytes.toBytes(columnTempName), Bytes.toBytes(columnName)));Field field = clazz.getDeclaredField(nameInDb2nameInJava(columnName));field.setAccessible(true);field.set(clazz, value);}list.add(clazz);}}return list;}/*** 数据库里下划线命名规则转化为java里面驼峰式命名** @param filedName 字段名称* @return javabean属性名称*/public static String nameInDb2nameInJava(String filedName) {String coluname = filedName.toLowerCase();//正则if (Pattern.compile("^\\S+_+\\S+$").matcher(coluname).find()) {char[] ca = coluname.toCharArray();for (int j = 1; j < ca.length - 1; j++) {if (ca[j] == '_') {ca[j] = '\0';ca[j + 1] = Character.toUpperCase(ca[j + 1]);}}coluname = new String(ca);}return coluname.replaceAll("\0", "");}public static void main(String[] args) throws IOException {Admin admin = getAdmin();ServerName master = admin.getMaster();System.out.println(master.getHostname() + ":" + master.getPort());Collection<ServerName> regionServers = admin.getRegionServers();for (ServerName serverName : regionServers) {System.out.println(serverName.getHostname() + ":" + serverName.getPort());}}
}
http://www.lryc.cn/news/482025.html

相关文章:

  • 会议直击|美格智能受邀出席第三届无锡智能网联汽车生态大会,共筑汽车产业新质生产力
  • 在 Jupyter Notebook 中使用 Matplotlib 进行交互式可视化的教程
  • Android13 系统/用户证书安装相关分析总结(三) 增加安装系统证书的接口遇到的问题和坑
  • 【C++ 算法进阶】算法提升十三
  • 【计网不挂科】计算机网络期末考试(综合)——【选择题&填空题&判断题&简述题】完整试卷
  • 2024年11月中旬记录
  • 单体架构 IM 系统之长轮询方案设计
  • Android Studio加载旧的安卓工程项目报错处理
  • 阿里公告:停止 EasyExcel 更新与维护
  • Spring 中的 BeanWrapper
  • 2024鹏城杯msic部分WP
  • DAY23|回溯算法Part02|LeetCode: 39. 组合总和 、40.组合总和II 、131.分割回文串
  • go map
  • 三十七、Python基础语法(异常)
  • ThreadLocal的熟悉与使用
  • 如何使用 Puppeteer 和 Browserless 抓取亚马逊产品数据?
  • 使用Python求解经典“三门问题”,揭示概率的奇妙之处
  • 数据库基础(6) . DDL
  • 2024 年度分布式电力推进(DEP)系统发展探究
  • vue通过iframe方式嵌套grafana图表
  • 简单介绍下 Java 中的 @Validated 和 @Valid 注解的区别?
  • SpringBoot配置Rabbit中的MessageConverter对象
  • C++ 错题本--duplicate symbol问题
  • Cursor的chat与composer的使用体验分享
  • 【优选算法 — 滑动窗口】最大连续1的个数 将 x 减到0的最小操作数
  • 《TCP/IP网络编程》学习笔记 | Chapter 8:域名及网络地址
  • FastHTML快速入门:调试模式和 URL中的变量
  • C++高级编程(8)
  • AUTOSAR_EXP_ARAComAPI的7章笔记(2)
  • 【C++】 C++游戏设计---五子棋小游戏