当前位置: 首页 > news >正文

【Python模块】——pymysql

pymysql是python操作mysql的标准库,可以通过pip install快速导入pymysql包操作数据库

使用pymysql操作mysql

简单demo

import pymysql
connect = pymysql.connect(host="localhost",port=3306,user="root",password="root",database="my_database",# charset="utf8mb4"
)
cursor = connect.cursor()# 查询语句1
sql = "select * from user where name = %(name)s"
ret = cursor.execute(sql, {"name": "ls"})
# 查询语句2
sql = "select * from user where name = %s"
ret = cursor.execute(sql, "ls")
print(ret)result = cursor.fetchall()
print("result", result)cursor.close()
connect.close()

自定义SqlHelper

import pymysqlclass MySQLClient(object):def __init__(self, **kwargs):self.conn = pymysql.connect(**kwargs)self.cursor = self.conn.cursor()def query(self, sql, *args):try:rowcount = self.cursor.execute(sql, *args)return rowcountexcept Exception as e:raise edef update(self, sql, *args):self.cursor.execute(sql, *args)self.conn.commit()def insert(self, sql, *args):self.cursor.execute(sql, *args)self.conn.commit()def fetch_one(self, sql, *args):self.query(sql, *args)result = self.cursor.fetchone()return resultdef fetch_all(self, sql, *args):self.query(sql, *args)result = self.cursor.fetchone()return resultdef close(self):self.cursor.close()self.conn.close()config = {"host": "localhost","port": 3306,"user": "root","password": "root","database": "my_database",
}mysql_client = MySQLClient(**config)
sql = "select * from user where name=%s"
ret = mysql_client.fetch_one(sql, "ls")
print(ret)# mysql_client.close()

借助DButils创建数据库连接池

DButils模块可以通过创建数据库连接池,提升数据库操作性能;
实现思路:

  1. 定义SqlHelper类
  2. 通过__init__方法定义pool=PoolDB(**kwargs),_local=threading.local()
  3. 定义__enter__获取connection与cursor和__exit__关闭connection与cursor,可支持with 上下文操作
  4. 为了保证每次获取的connection与cursor不会将之前的覆盖掉,引入threading.local进行保存;self._local = {thread_id: {“stack”: [(connection, cursor)]}}
#!/usr/bin/env python  
# -*- coding:utf-8 -*-  
import pymysql
from dbutils.pooled_db import PooledDB
from threading import localclass SqlHelper(object):def __init__(self):self.pool = PooledDB(creator=pymysql,  # 使用链接数据库的模块maxconnections=5,  # 连接池允许的最大连接数,0和None表示不限制连接数mincached=1,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建# maxcached=5,  # 链接池中最多闲置的链接,0和None不限制blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制setsession=[],  # 开始会话前执行的命令列表host='localhost',port=3306,user='root',password='root',database='my_database',charset='utf8')self._local = local()def open(self):connection = self.pool.connection()cursor = connection.cursor()return  connection, cursordef close(self, cursor, conn):cursor.close()conn.close()def __enter__(self):conn, cursor = self.open()rv =  getattr(self._local, "stack", None)if not rv:self._local.stack = [(conn, cursor)]else:self._local.stack.append((conn, cursor))return cursordef __exit__(self, exc_type, exc_val, exc_tb):rv = getattr(self._local, "stack", None)if not rv:# del self._local.stackreturnelif len(rv) == 1:conn, cursor = rv[-1]# del self._local.stackreturnelse:conn, cursor = rv.pop()cursor.close()conn.close()def fetchone(self, sql, *args):conn, cursor = self.open(self)try:rowcount = cursor.execute(sql, *args)ret = cursor.fetchone()return retexcept Exception as e:raisedef fetchall(self, sql, *args):conn, cursor = self.open(self)try:rowcount = cursor.execute(sql, *args)ret = cursor.fetchall()return retexcept Exception as e:raisedb = SqlHelper()sql = "select * from user"
with db as c1:ret = c1.execute(sql)print(ret)with db as c2:ret = c2.execute(sql)print(ret)

使用DButils的另一种写法

使用这种写法,每次都实例化SqlHelper,保证每次获取的connection和cursor不被覆盖

#!/usr/bin/env python  
# -*- coding:utf-8 -*-  
""" 
1. 定义全局变量POOL=pooledDB(**kwargs)
2. 每次用到db就实例化一次
"""
import pymysql
from dbutils.pooled_db import PooledDB
from threading import localpool = PooledDB(creator=pymysql,  # 使用链接数据库的模块maxconnections=0,  # 连接池允许的最大连接数,0和None表示不限制连接数mincached=1,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建# maxcached=5,  # 链接池中最多闲置的链接,0和None不限制blocking=False,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制setsession=[],  # 开始会话前执行的命令列表host='localhost',port=3306,user='root',password='root',database='my_database',charset='utf8')class SqlHelper(object):def __init__(self):self.conn = Noneself.cursor = Nonedef open(self):self.connection = pool.connection()self.cursor = self.connection.cursor()return  self.connection, self.cursordef close(self):self.cursor.close()self.conn.close()def __enter__(self):self.conn, self.cursor = self.open()return self.cursordef __exit__(self, exc_type, exc_val, exc_tb):self.close()db = SqlHelper()sql = "select * from user"
with db as c1:ret = c1.execute(sql)print("c1.cursor: ", db.cursor)print(ret)with db as c2:ret = c2.execute(sql)print("c2.cursor: ", db.cursor)  # 一个实例对象是可以多次调用enter方法的,但db.cursor发生了改变,即上一次的连接丢了print(ret)print(type(c1), type(c2))print(c1 is c2) # falseprint("c1.cursor: ", db.cursor) # c2.cursor将c1.cursor覆盖了
http://www.lryc.cn/news/542993.html

相关文章:

  • 【我的Android进阶之旅】Android Studio SDK Update Site 国内的腾讯云镜像配置指南
  • springboot实现多文件上传
  • Webpack打包优化
  • 浅谈HTTP及HTTPS协议
  • GTID的基本概念
  • .NET Core MVC IHttpActionResult 设置Headers
  • 数据结构与算法面试专题——桶排序
  • 深度学习奠基作 AlexNet 论文阅读笔记(2025.2.25)
  • MongoDB 数据库简介
  • Transformer LLaMA
  • 【DeepSeek开源:会带来多大的影响】
  • Redis7——基础篇(七)
  • 边缘计算:通俗易懂的全方位解析
  • Flink 中的滚动策略(Rolling Policy)
  • GPU和FPGA的区别
  • 网易云音乐分布式KV存储实践与演进
  • WordPress平台如何接入Deepseek,有效提升网站流量
  • 【嵌入式】STM32内部NOR Flash磨损平衡与掉电保护总结
  • 什么是磁盘阵列(RAID)?如何提高磁盘阵列的性能
  • 轻量级日志管理平台Grafana Loki
  • k8s集群部署
  • STM32MP157A-FSMP1A单片机移植Linux系统SPI总线驱动
  • 系统基础与管理(2025更新中)
  • Python--内置函数与推导式(下)
  • 可狱可囚的爬虫系列课程 14:10 秒钟编写一个 requests 爬虫
  • Windows golang安装和环境配置
  • IP-------GRE和MGRE
  • LabVIEW形状误差测量系统
  • django校园互助平台~源码
  • Vue进阶之AI智能助手项目(五)——ChatGPT的调用和开发