当前位置: 首页 > news >正文

Python读取hbase数据库

1. hbase连接

首先用hbase shell 命令来进入到hbase数据库,然后用list命令来查看hbase下所有表,以其中表“DB_level0”为例,可以看到库名“baotouyiqi”是拼接的,python代码访问时先连接:

def hbase_connection(hbase_master, hbase_port, table_prefix=None):connection = happybase.Connection(host=hbase_master, port=hbase_port, table_prefix=table_prefix)return connection
connection = hbase_connection(hbase_master, hbase_port, table_prefix)  # 在连接的时候创建项目空间
table = connection.table(tablename)  # 获取表连接

备注:完整代码在最后,想运行的直接滑倒最后复制即可

2. 按条件读取hbase数据

然后按照条件来查询表中想要的数据集,这里只列举两个条件:时间区间和指定列。同样,我们在shell下用scan命令来查看表中的数据结构:

可以看到第一列是ROW,第二列是COLUMN+CELL,python代码取数据方法差不多:

date_prex_start = bytes('dt_' + starttime, encoding='utf-8')  # row_start
date_prex_end = bytes('dt_' + endtime, encoding='utf-8')  # row_stop
# 通过设置row key的前缀row_prefix参数来进行局部扫描
outdata = dict(table.scan(row_start=date_prex_start, row_stop=date_prex_end,columns=[onecolumn]))

得到的结果如下,是个字典格式:

3. 按格式输出hbase数据结果

我们希望输出的结果是dataframe的,而且第一列是time,第二列是value,所以就做个简单格式处理:

timesep = list(map(lambda x: x.decode('utf-8').replace('dt_', ''), outdata.keys()))
tempdata = list(outdata.values())
valuelist = list(map(lambda x: float(list(x.values())[0]), tempdata))
if len(timesep) > 0:db_data2 = pd.DataFrame({'时间': timesep, onecolumn: valuelist})db_data2.loc[:, '时间2'] = [i[:16] for i in db_data2['时间']]db_data2 = db_data2.drop_duplicates(subset=['时间2'], keep='last')  # 一分钟内多次数值取一个即可
else:db_data2 = pd.DataFrame()
if len(db_data2) < 1:return pd.DataFrame()
db_data2.loc[:, '时间戳'] = [time.mktime(time.strptime(i, "%Y-%m-%d %H:%M:%S")) for i in db_data2['时间']]
db_data2 = db_data2.sort_values(by=['时间戳'], ascending=False)  # 将最新的数值放最前面
db_data3 = db_data2.drop(columns=['时间2', '时间戳'])
db_data3.columns = ['time', 'value']

4. 完整代码(code)

import happybase
import time
import pandas as pd
from pathlib import Pathos_file_name = Path(__file__).namedef hbase_connection(hbase_master, hbase_port, table_prefix=None):connection = happybase.Connection(host=hbase_master, port=hbase_port, table_prefix=table_prefix)return connectiondef get_data_by_tum(hbase_master, hbase_port, table_prefix, tablename, columnslist, starttime, endtime):columnsid = '$'.join(columnslist)onecolumn = 'TimeSe:dt_' + columnsid  # columnconnection = hbase_connection(hbase_master, hbase_port, table_prefix)  # 在连接的时候创建项目空间table = connection.table(tablename)  # 获取表连接date_prex_start = bytes('dt_' + starttime, encoding='utf-8')  # row_startdate_prex_end = bytes('dt_' + endtime, encoding='utf-8')  # row_stop# 通过设置row key的前缀row_prefix参数来进行局部扫描outdata = dict(table.scan(row_start=date_prex_start, row_stop=date_prex_end,columns=[onecolumn]))timesep = list(map(lambda x: x.decode('utf-8').replace('dt_', ''), outdata.keys()))tempdata = list(outdata.values())valuelist = list(map(lambda x: float(list(x.values())[0]), tempdata))if len(timesep) > 0:db_data2 = pd.DataFrame({'时间': timesep, onecolumn: valuelist})db_data2.loc[:, '时间2'] = [i[:16] for i in db_data2['时间']]db_data2 = db_data2.drop_duplicates(subset=['时间2'], keep='last')  # 一分钟内多次数值取一个即可else:db_data2 = pd.DataFrame()if len(db_data2) < 1:return pd.DataFrame()db_data2.loc[:, '时间戳'] = [time.mktime(time.strptime(i, "%Y-%m-%d %H:%M:%S")) for i in db_data2['时间']]db_data2 = db_data2.sort_values(by=['时间戳'], ascending=False)  # 将最新的数值放最前面db_data3 = db_data2.drop(columns=['时间2', '时间戳'])db_data3.columns = ['time', 'value']return db_data3if __name__ == '__main__':begin_time = '2023-08-22 00:00:00'end_time = '2023-08-23 00:00:00'hbase_master = "142.21.8.22"hbase_port = 9097table_prefix = "baotouyiqi"table_name = "DB_level0"onedata = ["62340", "20", "204"]dataget = get_data_by_tum(hbase_master, hbase_port, table_prefix, table_name,onedata, begin_time, end_time)print(dataget)

http://www.lryc.cn/news/309449.html

相关文章:

  • LeetCode41题:缺失的第一个正数(python3)
  • C# DataTable 对象操作
  • web运行时安全
  • FPGA 与 数字电路的关系 - 这篇文章 将 持续 更新 :)
  • 18 SpringMVC实战
  • Rocky Linux 运维工具 dnf
  • 浅谈 Linux fork 函数
  • golang 装饰器模式详解
  • 刷题笔记day27-回溯算法2
  • 前端架构: 脚手架命令行交互核心实现之inquirer和readline的应用教程
  • 【C++初阶】内存管理
  • 《PyTorch深度学习实践》第十二讲循环神经网络基础
  • 蓝桥杯算法题汇总
  • 【MySQL】学习多表查询和笛卡尔积 - 副本
  • C++设计模式_创建型模式_工厂方法模式
  • matlab批量替换txt文本文件的特定行的内容
  • Qt Creator配置MSVC编译环境、调试环境
  • Linux系统运维命令:终止监听在 TCP端口80上的所有进程(使用lsof,grep,awk组合命令, 终止监听在 TCP某个端口上的所有进程)
  • 开源模型应用落地-业务优化篇(七)
  • 序列化-反序列化--json-xml-protoBuf
  • ubuntu 配置nacos开机启动
  • 单节点大数据平台运维脚本
  • HTML基础知识
  • 牛客禁用题:求阶乘
  • spring.factories的常用配置项
  • 数据库-第二/三章 关系数据库和标准语言SQL【期末复习|考研复习】
  • 【办公类-21-05】20240227单个word按“段落数”拆分多个Word(成果汇编 只有段落文字 1拆5)
  • 【前端素材】推荐优质后台管理系统网页my-Task平台模板(附源码)
  • Linux高负载排查最佳实践
  • 【python开发】网络编程(上)