当前位置: 首页 > news >正文

Python一些可能用的到的函数系列132 ORM-sqlalchemy连clickhouse

说明

继续ORM的转换

通过ORM,可以:

  • 1 用几乎一样的方式来操作不同的数据库
  • 2 可以提供One的处理模式

内容

同步方式

这种方式更简单,适合处理小批量任务。这种操作严格来说,不是严格的One,而是MiniBatch,只是在某些时候,例如我自己的Interative Table,可以把这种方式视为One。真正的One还是要通过下面的异步方式来实现。

from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, func
from sqlalchemy.orm import sessionmaker,declarative_base
from datetime import datetimedb_url = f"postgresql://USER:PASSWD@IP:PORT/postgres"# from urllib.parse import quote_plus
# the_passed = quote_plus('!@#*')
# # 创建数据库引擎
pg_engine = create_engine(db_url)# 创建基类
Base = declarative_base()# 定义数据模型
class NewsContent(Base):__tablename__ = 'some_table'__table_args__ = {'schema': 'some_schema'}  # 指定模式id = Column(Integer, primary_key=True)mid = Column(String)content = Column(String)created = Column(DateTime)def dict(self):data_dict = {}data_dict['id'] = self.id data_dict['mid'] = self.mid data_dict['content'] = self.content data_dict['created'] = self.created return data_dict # 创建表
Base.metadata.create_all(pg_engine)# 创建会话
Session = sessionmaker(bind=pg_engine)
session = Session()# 随机选取100条数据 order_by(func.random()) 数据集太大或者索引没建好可能会非常慢
# random_news = session.query(NewsContent).limit(100).all()# >>>> 采用select in 的方式
# 查询最大id
max_id = session.query(func.max(NewsContent.id)).scalar()print(f"The maximum id is: {max_id}")import random# 定义范围和选择的数量
start = max_id-10000000
end = max_id
num_samples = 200# 从指定范围中随机选择
random_samples = random.sample(range(start, end + 1), num_samples)print(random_samples)# # 查询 ID 在集合中的记录
filtered_news = session.query(NewsContent).filter(NewsContent.id.in_(random_samples)).all()
filtered_news1 = [x.dict() for x in filtered_news ]

效果很好,速度很快。

异步方式

之后如果上生产了可以考虑这种方式

要使用 SQLAlchemy 异步连接 PostgreSQL,你可以结合 asyncio 和 SQLAlchemy 的异步支持。你使用的是 SQLAlchemy 2.x,而 SQLAlchemy 从 1.4 版本开始引入了对异步编程的支持,并在 2.x 版本中进一步强化和完善了这一功能。因此,SQLAlchemy 2.0.30 版本已经完全支持异步操作,特别是结合 asyncio 事件循环来异步连接数据库,如 PostgreSQL。

所以你当前使用的 SQLAlchemy 2.x 版本已经可以支持我之前提到的基于 asyncpg 的异步连接 PostgreSQL。这个版本的异步功能是通过 AsyncSession、create_async_engine 等接口来实现的。

以下是如何使用 SQLAlchemy 异步连接 PostgreSQL 的步骤:

1. 安装所需依赖

确保你已经安装了 asyncpg 和 SQLAlchemy 版本 1.4+ 及其他相关依赖。

pip install SQLAlchemy[asyncio] asyncpg

2. 配置异步连接

使用 async_engineAsyncSession 来配置异步连接。

示例代码

import asyncio
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String# 定义模型的基础类
Base = declarative_base()# 定义数据库模型
class User(Base):__tablename__ = 'users'id = Column(Integer, primary_key=True, autoincrement=True)name = Column(String)# 异步连接数据库
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"# 创建异步引擎
engine = create_async_engine(DATABASE_URL, echo=True)# 创建异步会话工厂
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False
)# 异步函数:数据库交互
async def async_main():async with async_session() as session:async with session.begin():# 创建新用户new_user = User(name="Async User")session.add(new_user)# 查询数据result = await session.execute("SELECT * FROM users")users = result.fetchall()print(users)# 启动异步事件循环
asyncio.run(async_main())

主要步骤解析:

  1. 创建异步引擎create_async_engine() 使用 postgresql+asyncpg 连接字符串。
  2. 异步会话:使用 sessionmaker() 创建异步会话工厂,结合 AsyncSession 实现数据库的异步交互。
  3. 异步操作:在 async_main() 中进行异步的数据库操作,如查询和添加数据。

注意事项:

  • 异步操作必须在 async 函数中执行,并通过 await 语句异步地进行数据库操作。
  • 数据库的连接字符串需要使用 postgresql+asyncpg 来指定 asyncpg 驱动。

这种方式可以有效利用异步 I/O,提高数据库操作的性能。

http://www.lryc.cn/news/434590.html

相关文章:

  • 华为 HCIP-Datacom H12-821 题库 (12)
  • pointpillar部署-TensorRT实现(三)
  • Java学习中,为什么会混淆类方法和实例方法,应该怎么办?
  • 【人工智能学习笔记】4_3 深度学习基础之循环神经网络
  • 解锁生活密码,AI答案之书解决复杂难题
  • Android Radio2.0——公告监听设置(四)
  • EMR Spark-SQL性能极致优化揭秘 Native Codegen Framework
  • 【VUE】实现当前页面刷新,刷新当前页面的两个方法(如何在一个页面写一个方法提供给全局其他地方调用)(如何重复调用同一个路由实现页面的重新加载)
  • 【科研小小白】灰度化处理、阈值、反色、二值化、边缘检测;平滑;梯度计算;双阈值检测;非极大值抑制
  • 数字经济时代,零售企业如何实现以消费者为中心的数字化转型?
  • 微积分复习笔记 Calculus Volume 1 - 1.5 Exponential and Logarithmic Functions
  • 代码随想录 刷题记录-24 图论 (1)理论基础 、深搜与广搜
  • MyBatis 缓存机制详解:原理、应用与优化策略
  • 跨越技术壁垒:EasyCVR为何选择支持FMP4格式,重塑视频汇聚平台标准
  • 美团OC感想
  • 搜维尔科技:AcuMap - 针灸模拟VR训练解决方案
  • WEB渗透权限维持篇-禁用Windows事件日志
  • 【设计模式】Template Method伪代码
  • 关于2023.9.2~2023.9.10学习总结与教训
  • NLTK:Python自然语言处理工具包及其参数使用详解
  • php 之 php-fpm 和 nginx结合使用
  • 数学建模笔记——TOPSIS[优劣解距离]法
  • 证书学习(四)X.509数字证书整理
  • 氚云,低代码领风者如何破解行业的“中式焦虑”?
  • “深入解析:MySQL半同步复制的配置指南与实践技巧“
  • 第四届长城杯部分wp
  • 打造无死角安防网:EasyCVR平台如何助力智慧警务实现视频+AI的全面覆盖
  • 批发订货系统源码怎么弄 门店订货系统小程序价格
  • 终端安全如何防护?一文为你揭晓答案!
  • 价值流架构指南:构建业务创新与竞争优势的全面方法论