当前位置: 首页 > news >正文

python sqlite3 线程池封装

1. 封装 sqlite3

1.1. 依赖包引入

# -*- coding: utf-8 -*-
#import os
import sys
import datetime
import loggingimport sqlite3

1.2. 封装类

class SqliteTool(object):#def __init__(self, host, port, user, password, database):def __init__(self, host, database):self._host = host#self._port = port#self._user = user#self._password = passwordself._database = databaseself._pool = Noneself._maxconns = 6  # 连接池中最多有多少个连接print("__init__", self._database)

1.3. 连接池操作

    def init_pool(self):'''初始化连接池'''try:logging.info('Begin to create {0} postgresql pool on:{1}.\n'.format(self._host, datetime.datetime.now()))pool = []for _ in range(self._maxconns):# check_same_thread=False 支持多线程conn = sqlite3.connect(self._database, check_same_thread=False)pool.append(conn)self._pool = pool#print("init_pool", self._maxconns, len(self._pool), self._pool)logging.info('SUCCESS: create {0} postgresql pool success on {1}.\n'.format(self._host, datetime.datetime.now()))except Exception as e:logging.error('ERROR: create {0} postgresql pool failed on {1}.\n'.format(self._host, datetime.datetime.now()))self.close_pool()sys.exit('ERROR: create postgresql pool error caused by {0}'.format(str(e)))def close_pool(self):'''关闭 pool'''if self._pool != None:for conn in self._pool:conn.close()def get_conn(self):if not self._pool:self.init_pool()return self._pool.pop()def close_conn(self, conn):if self._pool:self._pool.append(conn)

1.4. 增删改查

1.4.1. 创建表

    # 创建数据表def create_table(self, sql: str):"""创建表:param sql: create sql语句:return: True表示创建表成功"""print("create_table", sql)result = Falsetry:conn = self.get_conn()cursor = conn.cursor()cursor.execute(sql)print("[create table success]")result = Trueexcept Exception as e:logging.error('ERROR: execute {0} causes error {1} in create_table'.format(sql, str(e)))sys.exit('ERROR: create table from database error caused {0}'.format(str(e)))finally:cursor.close()conn.commit()#conn.close()self.close_conn(conn)return result

1.4.2. 删除表

    # 删除数据表def drop_table(self, sql: str):"""删除表:param sql: drop sql语句:return: True表示删除成功"""result = Falsetry:conn = self.get_conn()cursor = conn.cursor()cursor.execute(sql)print("[drop table success]")result = Trueexcept Exception as e:logging.error('ERROR: execute {0} causes error {1} in drop_table'.format(sql, str(e)))sys.exit('ERROR: drop table from database error caused {0}'.format(str(e)))finally:cursor.close()conn.commit()#conn.close()self.close_conn(conn)return result

1.4.3. 插入数据

    def exec_insert(self, sql):'''执行插入'''result = Falsetry:conn = self.get_conn()cursor = conn.cursor()cursor.execute(sql)result =  Trueexcept Exception as e:logging.error('ERROR: execute {0} causes error {1} in exec_insert'.format(sql, str(e)))sys.exit('ERROR: insert data from database error caused {0}'.format(str(e)))finally:cursor.close()conn.commit()#conn.close()self.close_conn(conn)return resultdef exec_insert_plus(self, table: str, params: dict):'''执行插入'''result = Falsetry:key_tup = tuple(params.keys())key_str = ",".join(key_tup)# different with psqlval_str = ",".join(("?",)*len(key_tup))sql_str = "insert into " + table + " (" + key_str + ") values (" + val_str + ")"#val_tup = tuple(params.values())val_tup = ()for item in params.values():if type(item) == list:val_tup += (json.dumps(item),)elif type(item) == str:val_tup += (item,)elif type(item) == int:val_tup += (item,)else:val_tup += (item,)#val_tup.append(str(item))#print("exec_insert_plus", sql_str, val_tup)conn = self.get_conn()cursor = conn.cursor()cursor.execute(sql_str, val_tup)result =  Trueexcept Exception as e:logging.error('ERROR: execute {0} causes error {1} in exec_insert_plus'.format(table, str(e)))sys.exit('ERROR: insert data from database error caused {0}'.format(str(e)))finally:cursor.close()conn.commit()#conn.close()self.close_conn(conn)return resultdef exec_insert_many(self, table: str, params: List[Dict]):def dict_to_str(tab: str, param: Dict):key_tup = tuple(param.keys())key_str = ",".join(key_tup)# different with psqlval_str = ",".join(("?",)*len(key_tup))sql_str = "insert into " + tab + " (" + key_str + ") values (" + val_str + ")"return sql_strdef dict_to_tuple(param: Dict):val_tup = ()for item in param.values():if type(item) == list:val_tup += (json.dumps(item),)elif type(item) == str:val_tup += (item,)elif type(item) == int:val_tup += (item,)else:val_tup += (item,)#val_tup.append(str(item))return val_tup'''执行插入'''result = Falseif len(params) <= 0:return resulttry:sql_str = dict_to_str(table, params[0])val_lst = []for param in params:val_lst.append(dict_to_tuple(param))print("exec_insert_many", sql_str, val_lst)conn = self.get_conn()cursor = conn.cursor()cursor.executemany(sql_str, val_lst)result =  Trueexcept Exception as e:logging.error('ERROR: execute {0} causes error {1} in exec_insert'.format(sql_str, str(e)))sys.exit('ERROR: insert data from database error caused {0}'.format(str(e)))finally:cursor.close()conn.commit()#conn.close()self.close_conn(conn)return result

1.4.4. 删除数据

	# sql = "DELETE from users where user_id='83f7d86b594e4b26a7196ab761afcc7c';"def exec_delete(self, sql):'''执行查询'''result = Falsetry:conn = self.get_conn()cursor = conn.cursor()cursor.execute(sql)result =  Trueexcept Exception as e:logging.error('ERROR: execute {0} causes error {1} in exec_delete'.format(sql, str(e)))sys.exit('ERROR: delete data from database error caused {0}'.format(str(e)))finally:cursor.close()conn.commit()#conn.close()self.close_conn(conn)return result

1.4.5. 更新数据

	# 修改单个值# update tasks set status='running' where task_id='0791216839b04d5c88846817f78280cc';# 修改多个值# update tasks set status='running',score='10' where task_id='0791216839b04d5c88846817f78280cc';def exec_update(self, sql):'''执行更新'''result = Falsetry:conn = self.get_conn()cursor = conn.cursor()cursor.execute(sql)result =  Trueexcept Exception as e:logging.error('ERROR: execute {0} causes error {1} in exec_update'.format(sql, str(e)))sys.exit('ERROR: update data from database error caused {0}'.format(str(e)))finally:cursor.close()conn.commit()#conn.close()self.close_conn(conn)return result

1.4.6. 查询数据

    # select * from users where user_name='hello';def exec_select(self, sql):'''执行查询'''try:conn = self.get_conn()cursor = conn.cursor()cursor.execute(sql)result = cursor.fetchall()#result = cursor.fetchone()except Exception as e:logging.error('ERROR: execute {0} causes error {1} in exec_select'.format(sql, str(e)))sys.exit('ERROR: load data from database error caused {0}'.format(str(e)))finally:cursor.close()#conn.close()#print("exec_select", len(self._pool), self._pool)print("init_pool", self._maxconns, len(self._pool), self._pool)self.close_conn(conn)return result

1.4.7. 测试

    def test_select(self, sql):result = self.exec_select(sql)print("test_select", result)return result

2. 操作使用实例

# for test
from typing import Optional, List, Dict, Union
from pydantic import BaseModel, Fieldclass ......if __name__ == '__main__':dbhost = ""dbdatabase = "./test.db"db = SqliteTool(dbhost, dbdatabase)class TaskInDB(BaseModel):task_id: strdisabled: intdef create_tests_table(db):sql_str = "create table if not exists tests("sql_str += "task_id char(32) primary key,"sql_str += "disabled int not null"sql_str += ");"return db.create_table(sql_str)def drop_tests_table(db):sql_str = "drop table if exists tests;"return db.drop_table(sql_str)def get_tests_indb(db, tasks: Union[List[str], str, None] = None):if tasks == None:sql_str = "select * from tests;"if type(tasks) == list:print("list", tasks)key_str = ",".join(tasks)sql_str = "select * from tests where task_id in (" + key_str + ");"elif type(tasks) == str:print("str", tasks)sql_str = "select * from tests where task_id ='"+tasks+"';"elif type(tasks) == None:print("none")ret_tasks = db.exec_select(sql_str)return ret_tasksdef create_tests_indb(db, tasks: List[TaskInDB]):#return db.exec_insert_plus("tests", task_indb.model_dump())#return db.exec_insert_plus("tests", task_indb.dict())params = []for task in tasks:print("create_tests_indb", task)#params.append(task.model_dump())params.append(task.dict())return db.exec_insert_many("tests", params)#  重建数据库if not drop_tests_table(db) or not create_tests_table(db):print("ERROR")# 创建两条记录task_indb1 = TaskInDB(task_id="11111111", disabled=1)#create_tests_indb(db, task_indb)task_indb2 = TaskInDB(task_id="22222222", disabled=0)#create_tests_indb(db, task_indb)task_indb3 = TaskInDB(task_id="33333333", disabled=0)#create_tests_indb(db, task_indb)create_tests_indb(db, [task_indb1, task_indb2, task_indb3])# 查询记录#key_tup = tuple(TaskInDB.model_fields.keys())key_tup = tuple(TaskInDB.__fields__.keys())#key_str = ",".join(key_tup)# allret_tasks = get_tests_indb(db)#  str#ret_tasks = get_tests_indb(db, tasks="11111111")# list#ret_tasks = get_tests_indb(db, tasks=["11111111", "22222222"])for ret_task in ret_tasks:print(ret_task)task_indb = TaskInDB(**{key: ret_task[i] for i,key in enumerate(key_tup)})print(task_indb)print("OK")
http://www.lryc.cn/news/287435.html

相关文章:

  • 亚马逊运营:如何通过自养号测评有效防关联,避免砍单
  • winfrom图像加速渲染时图像不显示
  • Redash 默认key漏洞(CVE-2021-41192)复现
  • Git学习笔记:3 git tag命令
  • 10年软件测试经验,该有什么新的职业规划?
  • 重构改善既有代码的设计-学习(四):简化条件逻辑
  • 【代码---利用一个小程序,读取文件夹中图片,将其合成为一个视频】
  • MVC 和 MVVM的区别
  • redis—Set集合
  • 【jetson笔记】vscode远程调试
  • 大数据处理流程包括哪些环节
  • C++入门篇章1(C++是如何解决C语言不能解决的问题的)
  • java复习篇 数据结构:链表第一节
  • 深入理解与运用Lombok的@Cleanup注解:自动化资源管理利器
  • 【LeetCode每日一题】2865. 美丽塔 I
  • Cute Http File Server 使用文章
  • c#算法(10)——求点到直线的距离
  • [小脚本] maya 命令行常用操作
  • 数据结构·单链表
  • Redis(秒杀活动、持久化之RDB、AOF)
  • Window安装Python和开发Pycharm
  • 技术驱动宠物健康:宠物在线问诊系统的高效搭建手册
  • 玩转k8s:yaml介绍
  • 【spdk】spdk compressdev测试
  • Linux中并发程序设计(进程的创建和回收、exec函数使用)
  • 2023年DevOps国际峰会暨 BizDevOps 企业峰会(DOIS北京站):核心内容与学习收获(附大会核心PPT下载)
  • pdf 转html 在线预览和查询
  • docker 体验怀旧游戏(魂斗罗等)
  • JS中判断数据类型总结以及方法封装
  • 【Midjourney】绘画风格关键词