当前位置: 首页 > news >正文

HBase客户端的批量写缓存BufferedMutator

HBase数据刷写 之前提到过这个方法,那么BufferedMutator是什么?又应该如何实现呢?

写缓存

HBase的每一个put操作实际上是一个RPC操作,将客户端的数据传输到服务器再返回结果,这只适用于小数据量的操作,如果数据量多的话,每次put都需要建立一次RPC的连接(TCP连接),而建立连接传输数据是需要时间的,因此减少RPC的调用可以提高数据传输的效率,减少建立连接的时间和IO消耗。

HBase的客户端API提供了写缓存区,put的数据一开始放在缓存区内,当数量到达指定的容量或者用户强制提交是才将数据一次性提交到HBase的服务器。这个缓冲区可以通过调用 HTable.setAutoFlush(false) 来开启。而新版HBbase的API中使用了BufferedMutator替换了老版的缓冲区,通过BufferedMutator对象提交的数据自动存放在缓冲区中。

BufferedMutator

通过获取 BufferedMutator 对象,并调用 mutator.mutate(List<Mutation> mutations) 方法来进行批量插入数据。可以使用 Put 类型的对象列表作为 mutations 参数进行插入。BufferedMutator 提供了自动管理缓冲区和写入操作的功能,可以提高插入数据的性能。

单次一张表批量写入

        Configuration conf = HBaseConfiguration.create();conf.set("hbase.zookeeper.quorum", "zookeeperHost");final BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() {@Overridepublic void onException(RetriesExhaustedWithDetailsException e, BufferedMutator mutator) {for (int i = 0; i < e.getNumExceptions(); i++) {LOG.info("Failed to sent put " + e.getRow(i) + ".");}}};BufferedMutatorParams params = new BufferedMutatorParams(TABLE).listener(listener);params.writeBufferSize(123123L);try {Connection conn = ConnectionFactory.createConnection(conf);BufferedMutator mutator = conn.getBufferedMutator(params);Put p = new Put(Bytes.toBytes("someRow"));p.addColumn(FAMILY, Bytes.toBytes("someQualifier"), Bytes.toBytes("some value"));mutator.mutate(p);mutator.close();conn.close();} catch (IOException e1) {// TODO Auto-generated catch blocke1.printStackTrace();}

单次多张表批量写入

private static Map<String, BufferedMutator> tableConnectionMgr = new ConcurrentHashMap<>();
private BufferedMutator getTableConnection(String tableName) throws IOException {if (tableConnectionMgr.get(tableName) != null) {return tableConnectionMgr.get(tableName);}Connection connection = ConnectionFactory.createConnection(config);BufferedMutator table = connection.getBufferedMutator(TableName.valueOf(tableName));tableConnectionMgr.put(tableName, table);log.info("hbase table: {} connect established!", tableName);return tableConnectionMgr.get(tableName);
}

http://www.lryc.cn/news/144261.html

相关文章:

  • 从多个角度详解map转为list
  • PHP用CURL发送Content-type为application/json的POST请求方法
  • 【程序猿书籍大放送:第二期】《强化学习:原理与Python实战》
  • SV-6002Y 网络对讲求助模块,带3W功放输出和一路30W功放输出
  • Nginx详解 二:配置文件部分
  • SMC_TRAFO_GantryCutter2 (FB) 带刀片旋向龙门
  • 『PyQt5-Qt Designer篇』| 07 Qt Designer中栅格布局和表单布局的使用
  • 无涯教程-分类算法 - 多项式逻辑回归模型函数
  • 【C++】开源:Box2D动力学库配置与使用
  • Druid连接池和Apache的DBUtils
  • 怎样快速选择正确的可视化图表?
  • 6路液体水位检测芯片VK36W6D SOP16 抗电源干扰及手机干扰特性好
  • 【设备树笔记整理6】中断系统中的设备树
  • 微信小程序下载后端返回的文件流
  • Autoware.universe部署04:universe传感器ROS2驱动
  • Spring boot如何工作
  • 代码随想录打卡—day45—【DP】— 8.29 完全背包应用
  • 2023.8.28日论文阅读
  • HAproxy(四十七)
  • Java实战场景下的ElasticSearch
  • 拓世科技集团 | “书剑人生”李步云学术思想研讨会暨李步云先生九十华诞志庆
  • 前端须知名词解释
  • React性能优化之memo缓存函数
  • 2023年高教社杯 国赛数学建模思路 - 案例:ID3-决策树分类算法
  • C# Emgu.CV 条码检测
  • VueRouter的基本使用
  • 网工笔记:快速认识7类逻辑接口
  • MySQL中的free链表,flush链表,LRU链表
  • mac使用VsCode远程连接服务器总是自动断开并要求输入密码的解决办法
  • Python爬虫分布式架构 - Redis/RabbitMQ工作流程介绍