当前位置: 首页 > news >正文

python pandas.DataFrame 直接写入Clickhouse

import pandas as pd
import sqlalchemy
from clickhouse_sqlalchemy import Table, engines
from sqlalchemy import create_engine, MetaData, Column
import urllib.parsehost = '1.1.1.1'
user = 'default'
password = 'default'
db = 'test'
port = 8123 # http连接端口
engine = create_engine('clickhouse://{user}:{password}@{host}:{port}/{db}'.format(user = user,host = host,password = urllib.parse.quote_plus(password),db = db,port = port),pool_size = 30,max_overflow = 0,pool_pre_ping=True , pool_recycle= 3600)
port = 9000 # Tcp/Ip连接端口
engine1 = create_engine('clickhouse+native://{user}:{password}@{host}:{port}/{db}'.format(user = user,host = host,password = urllib.parse.quote_plus(password),db = db,port = port),pool_size = 30,max_overflow = 0,pool_pre_ping=True , pool_recycle=3600)# https://github.com/xzkostyan/clickhouse-sqlalchemy/issues/129
# 参考文档https://github.com/xzkostyan/clickhouse-sqlalchemy
# pip install sqlalchemy -i https://pypi.tuna.tsinghua.edu.cn/simple
# pip install clickhouse-sqlalchemy -i https://pypi.tuna.tsinghua.edu.cn/simpleclass ClickhouseDf(object):def __init__(self, **kwargs):self.engines_dict = {"MergeTree": engines.MergeTree,"AggregatingMergeTree": engines.AggregatingMergeTree,"GraphiteMergeTree": engines.GraphiteMergeTree,"CollapsingMergeTree": engines.CollapsingMergeTree,"VersionedCollapsingMergeTree": engines.VersionedCollapsingMergeTree,"SummingMergeTree": engines.SummingMergeTree,"ReplacingMergeTree": engines.ReplacingMergeTree,"Distributed": engines.Distributed,"ReplicatedMergeTree": engines.ReplicatedMergeTree,"ReplicatedAggregatingMergeTree": engines.ReplicatedAggregatingMergeTree,"ReplicatedCollapsingMergeTree": engines.ReplicatedCollapsingMergeTree,"ReplicatedVersionedCollapsingMergeTree": engines.ReplicatedVersionedCollapsingMergeTree,"ReplicatedReplacingMergeTree": engines.ReplicatedReplacingMergeTree,"ReplicatedSummingMergeTree": engines.ReplicatedSummingMergeTree,"View": engines.View,"MaterializedView": engines.MaterializedView,"Buffer": engines.Buffer,"TinyLog": engines.TinyLog,"Log": engines.Log,"Memory": engines.Memory,"Null": engines.Null,"File": engines.File}self.table_engine = kwargs.get("table_engine", "MergeTree")  # 默认引擎选择if self.table_engine not in self.engines_dict.keys():raise ValueError("No engine for this table")def _createORMTable(self, df, name, con, schema, **kwargs):col_dtype_dict = {"object": sqlalchemy.Text,"int64": sqlalchemy.Integer,"int32": sqlalchemy.Integer,"int16": sqlalchemy.Integer,"int8": sqlalchemy.Integer,"int": sqlalchemy.Integer,"float64": sqlalchemy.Float,"float32": sqlalchemy.Float,"float16": sqlalchemy.Float,"float8": sqlalchemy.Float,"float": sqlalchemy.Float,}primary_key = kwargs.get("primary_key", [])df_col = df.columns.tolist()metadata = MetaData(bind=con, schema=schema)_table_check_col = []for col in df_col:col_dtype = str(df.dtypes[col])if col_dtype not in col_dtype_dict.keys():if col in primary_key:_table_check_col.append(Column(col, col_dtype_dict["object"], primary_key=True))else:_table_check_col.append(Column(col, col_dtype_dict["object"]))else:if col in primary_key:_table_check_col.append(Column(col, col_dtype_dict[col_dtype], primary_key=True))else:_table_check_col.append(Column(col, col_dtype_dict[col_dtype]))_table_check = Table(name, metadata,*_table_check_col,self.engines_dict[self.table_engine](primary_key=primary_key))return _table_checkdef _checkTable(self, name, con, schema):sql_str = f"EXISTS {schema}.{name}"if con.execute(sql_str).fetchall() == [(0,)]:return 0else:return 1def to_sql(self, df, name: str, con, schema=None, if_exists="fail",**kwargs):'''将DataFrame格式数据插入Clickhouse中{'fail', 'replace', 'append'}, default 'fail''''if self.table_engine in ["MergeTree"]:  # 表格必须有主键的引擎列表-暂时只用这种,其他未测试self.primary_key = kwargs.get("primary_key", [df.columns.tolist()[0]])else:self.primary_key = kwargs.get("primary_key", [])orm_table = self._createORMTable(df, name, con, schema, primary_key=self.primary_key)tanle_exeit = self._checkTable(name, con, schema)# 创建表if if_exists == "fail":if tanle_exeit:raise ValueError(f"table already exists :{name} ")else:orm_table.create()if if_exists == "replace":if tanle_exeit:orm_table.drop()orm_table.create()else:orm_table.create()if if_exists == "append":if not tanle_exeit:orm_table.create()# http连接下会自动将None填充为空字符串以入库,tcp/ip模式下则不会,会导致引擎报错,需要手动填充。df_dict = df.to_dict(orient="records")con.execute(orm_table.insert(), df_dict)# df.to_sql(name, con, schema, index=False, if_exists="append")if __name__ == '__main__':# 使用方法cdf = ClickhouseDf()df = pd.DataFrame({'column1': [1, 2, 3],'column2': ['A', 'B', 'C']})db = 'default'password = ''user = 'default'port = 9090host = '192.168.76.136'engine = create_engine('clickhouse+native://{user}:{password}@{host}:{port}/{db}'.format(user=user,host=host,password=urllib.parse.quote_plus(password),db=db,port=port),pool_size=30, max_overflow=0,pool_pre_ping=True, pool_recycle=3600)with engine.connect() as conn:cdf.to_sql(df, "table_name", conn, schema='default', if_exists="replace")list = engine.connect().execute("SELECT * FROM table_name").fetchall()print(list)

1) 运行需要安装包

# pip install sqlalchemy -i https://pypi.tuna.tsinghua.edu.cn/simple
# pip install clickhouse-sqlalchemy -i https://pypi.tuna.tsinghua.edu.cn/simple
 

2)cdf.to_sql(df, "table_name", conn, schema='default', if_exists="replace")

这里的 schema 一定要写,判断表是否存在 是用 

if con.execute('EXISTS default.table_name') == [(0,)]: 来判断表是否存在的

参考链接: SQLAlchemy_clickhouse_sqlalchemy-CSDN博客

https://github.com/xzkostyan/clickhouse-sqlalchemy

http://www.lryc.cn/news/206681.html

相关文章:

  • 德语中第二虚拟式在主动态的形式,柯桥哪里可以学德语
  • [Python进阶] 消息框、弹窗:tkinter库
  • (免费领源码)java#Springboot#mysql装修选购网站99192-计算机毕业设计项目选题推荐
  • 生活废品回收系统 JAVA语言设计和实现
  • redhat/centos 配置本地yum源
  • FLStudio2024汉化破解版在哪可以下载?
  • Java 音频处理,音频流转音频文件,获取音频播放时长
  • Spring Boot发送邮件
  • 智慧矿山:AI算法助力!刮板机监测,生产效率和安全性提升!
  • Qt跨平台(统信UOS)各种坑解决办法
  • ORB-SLAM3算法1之Ubuntu18.04+ROS-melodic安装ORB-SLAM3及各种问题解决
  • git学习笔记之用命令行解决冲突
  • C语言中的内联汇编是什么?如何使用内联汇编进行底层编程?
  • react笔记基础部分(组件生命周期路由)
  • Sentinel授权规则和规则持久化
  • JVM(三) 垃圾回收
  • vue3中使用svg并封装成组件
  • 实验六:DHCP、DNS、Apache、FTP服务器的安装和配置
  • Python实验项目4 :面对对象程序设计
  • 用html、css和jQuery实现图片翻页的特效
  • awk 框架
  • 专业135总分400+西安交通大学信息与通信工程学院909/815考研经验分享
  • 在 Windows 用 Chrome System Settings 设置代理
  • Excel多线程导入数据库
  • Linux开机默认进入命令行或图形化模式
  • ajax请求的时候get 和post方式的区别?
  • 还不知道光场相机吗?
  • 软信天成:助力某制造企业建设产品主数据管理平台案例分享
  • C#WPFPrism框架导航应用实例
  • Centos安装gitlabce