Python Peewee库连接和操作MySQL数据库
目录
- Peewee 的主要特性:
- 典型应用场景:
- docker 启动数据库
- 环境准备
- crud操作
Peewee 是一个轻量级且表达力强的 Python 对象关系映射(ORM)库,它允许开发者使用 Python 对象和表达式来操作关系型数据库,从而在大多数情况下避免了手动编写原始 SQL 语句的需求。
Peewee 的主要特性:
- 简洁性:Peewee 设计简洁,易于理解和集成到项目中。
- 强大的查询 API:提供直观、灵活的查询构建方式,通过 Python 表达式进行数据库操作。
- 支持多种数据库:支持 SQLite、MySQL、PostgreSQL 等主流数据库,部分扩展还支持更多类型。
- 模型驱动设计:通过定义类对应数据库表结构,实现类型安全的数据操作。
- 轻量高效:相比 SQLAlchemy 等大型 ORM,Peewee 更加轻便,专注于核心功能。
- 文档完善:拥有结构清晰的文档和丰富的示例,社区活跃。
- 支持迁移:内置数据库结构变更管理(需配合插件如
playhouse
使用)。 - 关系处理良好:对表之间的关联(如外键)有良好的支持。
典型应用场景:
- 中小型应用,尤其是不需要完整 ORM 功能的场景。
- 快速原型开发或脚本任务中涉及数据库操作的部分。
- 需要简单易用 ORM 的项目。
docker 启动数据库
docker-compose.yml
services:mysql:image: mysql:8.0restart: alwaysenvironment:MYSQL_ROOT_PASSWORD: rootMYSQL_DATABASE: test_dbMYSQL_USER: test_userMYSQL_PASSWORD: test_passwordports:- "3307:3306"volumes:- mysql-data:/var/lib/mysqlvolumes:mysql-data:
启动
docker-compose up -d
环境准备
uv init
uv venv
.venv/Scripts/activate
uv pip install peewee pymysql cryptography notebook
crud操作
from peewee import MySQLDatabase, Model, CharField, IntegerField# 创建数据库连接
db = MySQLDatabase('test_db', # 数据库名user='test_user', # 用户名password='test_password', # 密码host='localhost', # 你Docker容器映射到了主机的33端口port=3307, # 主机端口
)class BaseModel(Model):class Meta:database = dbclass User(BaseModel):name = CharField()age = IntegerField()# 初始化数据库(创建表)
def init_db():db.connect()db.create_tables([User], safe=True)print("数据库和表初始化完毕")init_db()
数据库和表初始化完毕
def create_user(name, age):user = User.create(name=name, age=age)print(f"创建用户: {user.id} - {user.name}, {user.age}")return useru1 = create_user("Alice", 25)
u2 = create_user("Bob", 30)
创建用户: 4 - Alice, 25
创建用户: 5 - Bob, 30
# 读取记录
def get_all_users():users = User.select()for user in users:print(f"{user.id}: {user.name}, {user.age}")print("所有用户:")
get_all_users()
所有用户:
4: Alice, 25
5: Bob, 30
# 更新记录
def update_user(user_id, new_age):query = User.update(age=new_age).where(User.id == user_id)rows = query.execute()print(f"更新了 {rows} 条记录")# 改
update_user(u1.id, 26)
更新了 1 条记录
# 删除记录
def delete_user(user_id):rows = User.delete().where(User.id == user_id).execute()print(f"删除了 {rows} 条记录")# 删
delete_user(u2.id)
删除了 1 条记录
# 再查
print("更新/删除后用户:")
get_all_users()
更新/删除后用户:
4: Alice, 26
def delete_all_users():rows = User.delete().execute()print(f"已删除 {rows} 个用户")delete_all_users()
已删除 1 个用户
db.close()
True