Python使用PyMySQL操作MySQL完整指南
Python使用PyMySQL操作MySQL完整指南
1. 安装依赖
pip install pymysql
2. 基础配置和数据库操作
2.1 基础配置类
import pymysql
from typing import List, Dict, Optional
from datetime import datetimeclass MySQLDB:def __init__(self):self.conn = Noneself.cursor = Nonedef connect(self):"""连接数据库"""try:self.conn = pymysql.connect(host='localhost',port=3306,user='root',password='123456',charset='utf8mb4',cursorclass=pymysql.cursors.DictCursor # 返回字典格式)self.cursor = self.conn.cursor()except Exception as e:print(f"连接数据库失败:{e}")raise edef close(self):"""关闭数据库连接"""if self.cursor:self.cursor.close()if self.conn:self.conn.close()
2.2 数据库和表操作
def create_database(self):"""创建数据库"""try:sql = "CREATE DATABASE IF NOT EXISTS test DEFAULT CHARACTER SET utf8mb4"self.cursor.execute(sql)self.conn.select_db('test') # 选择数据库except Exception as e:print(f"创建数据库失败:{e}")raise edef drop_database(self):"""删除数据库"""try:sql = "DROP DATABASE IF EXISTS test"self.cursor.execute(sql)except Exception as e:print(f"删除数据库失败:{e}")raise edef create_table(self):"""创建用户表"""try:sql = """CREATE TABLE IF NOT EXISTS users (id BIGINT PRIMARY KEY AUTO_INCREMENT COMMENT '自增主键',username VARCHAR(50) NOT NULL UNIQUE COMMENT '用户名',password VARCHAR(100) NOT NULL COMMENT '密码',age INT COMMENT '年龄',create_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',INDEX idx_username (username)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户表';"""self.cursor.execute(sql)self.conn.commit()except Exception as e:self.conn.rollback()print(f"创建表失败:{e}")raise e
3. CRUD操作
3.1 插入操作
def insert_user(self, username: str, password: str, age: int) -> int:"""插入单个用户"""try:sql = "INSERT INTO users(username, password, age) VALUES (%s, %s, %s)"self.cursor.execute(sql, (username, password, age))self.conn.commit()return self.cursor.lastrowid # 返回自增IDexcept Exception as e:self.conn.rollback()print(f"插入用户失败:{e}")raise edef batch_insert_users(self, users: List[Dict]):"""批量插入用户"""try:sql = "INSERT INTO users(username, password, age) VALUES (%s, %s, %s)"values = [(user['username'], user['password'], user['age']) for user in users]self.cursor.executemany(sql, values)self.conn.commit()except Exception as e:self.conn.rollback()print(f"批量插入用户失败:{e}")raise e
3.2 查询操作
def get_user_by_id(self, user_id: int) -> Optional[Dict]:"""根据ID查询用户"""try:sql = "SELECT * FROM users WHERE id = %s"self.cursor.execute(sql, (user_id,))return self.cursor.fetchone()except Exception as e:print(f"查询用户失败:{e}")raise edef get_users_by_age(self, min_age: int) -> List[Dict]:"""查询大于指定年龄的用户"""try:sql = "SELECT * FROM users WHERE age > %s"self.cursor.execute(sql, (min_age,))return self.cursor.fetchall()except Exception as e:print(f"查询用户失败:{e}")raise edef get_users_with_pagination(self, page: int, page_size: int) -> List[Dict]:"""分页查询用户"""try:offset = (page - 1) * page_sizesql = "SELECT * FROM users LIMIT %s OFFSET %s"self.cursor.execute(sql, (page_size, offset))return self.cursor.fetchall()except Exception as e:print(f"分页查询失败:{e}")raise e
3.3 更新操作
def update_user(self, user_id: int, password: str = None, age: int = None) -> bool:"""更新用户信息"""try:updates = []values = []if password is not None:updates.append("password = %s")values.append(password)if age is not None:updates.append("age = %s")values.append(age)if not updates:return Falsevalues.append(user_id)sql = f"UPDATE users SET {', '.join(updates)} WHERE id = %s"result = self.cursor.execute(sql, tuple(values))self.conn.commit()return result > 0except Exception as e:self.conn.rollback()print(f"更新用户失败:{e}")raise e
3.4 删除操作
def delete_user(self, user_id: int) -> bool:"""删除用户"""try:sql = "DELETE FROM users WHERE id = %s"result = self.cursor.execute(sql, (user_id,))self.conn.commit()return result > 0except Exception as e:self.conn.rollback()print(f"删除用户失败:{e}")raise e
4. 使用示例
def main():# 创建数据库实例db = MySQLDB()try:# 连接数据库db.connect()# 创建数据库和表db.create_database()db.create_table()# 插入单个用户user_id = db.insert_user("张三", "123456", 25)print(f"插入用户ID: {user_id}")# 批量插入用户users = [{"username": "李四", "password": "123456", "age": 26},{"username": "王五", "password": "123456", "age": 27}]db.batch_insert_users(users)# 查询用户user = db.get_user_by_id(user_id)print(f"查询到的用户: {user}")# 更新用户db.update_user(user_id, age=30)# 分页查询users = db.get_users_with_pagination(1, 10)print(f"分页查询结果: {users}")# 删除用户db.delete_user(user_id)except Exception as e:print(f"操作失败:{e}")finally:db.close()if __name__ == "__main__":main()
5. 事务处理示例
def transfer_money(self, from_id: int, to_id: int, amount: float):"""转账操作示例"""try:# 开始事务self.conn.begin()# 扣除转出账户金额sql1 = "UPDATE accounts SET balance = balance - %s WHERE id = %s AND balance >= %s"result1 = self.cursor.execute(sql1, (amount, from_id, amount))if result1 == 0:raise Exception("余额不足或账户不存在")# 增加转入账户金额sql2 = "UPDATE accounts SET balance = balance + %s WHERE id = %s"result2 = self.cursor.execute(sql2, (amount, to_id))if result2 == 0:raise Exception("转入账户不存在")# 提交事务self.conn.commit()return Trueexcept Exception as e:# 回滚事务self.conn.rollback()print(f"转账失败:{e}")raise e
6. 注意事项
-
连接管理
- 及时关闭数据库连接
- 使用连接池管理连接(可以使用 DBUtils 等库)
- 处理连接超时情况
-
安全性
- 使用参数化查询防止 SQL 注入
- 妥善保管数据库凭证
- 最小权限原则
-
性能优化
- 合理使用索引
- 批量操作代替循环单条操作
- 避免查询无用字段
-
错误处理
- 完善的异常处理机制
- 事务回滚
- 日志记录