definit_pool(self):'''初始化连接池'''try:logging.info('Begin to create {0} postgresql pool on:{1}.\n'.format(self._host, datetime.datetime.now()))pool =[]for _ inrange(self._maxconns):# check_same_thread=False 支持多线程conn = sqlite3.connect(self._database, check_same_thread=False)pool.append(conn)self._pool = pool#print("init_pool", self._maxconns, len(self._pool), self._pool)logging.info('SUCCESS: create {0} postgresql pool success on {1}.\n'.format(self._host, datetime.datetime.now()))except Exception as e:logging.error('ERROR: create {0} postgresql pool failed on {1}.\n'.format(self._host, datetime.datetime.now()))self.close_pool()sys.exit('ERROR: create postgresql pool error caused by {0}'.format(str(e)))defclose_pool(self):'''关闭 pool'''if self._pool !=None:for conn in self._pool:conn.close()defget_conn(self):ifnot self._pool:self.init_pool()return self._pool.pop()defclose_conn(self, conn):if self._pool:self._pool.append(conn)
1.4. 增删改查
1.4.1. 创建表
# 创建数据表defcreate_table(self, sql:str):"""创建表:param sql: create sql语句:return: True表示创建表成功"""print("create_table", sql)result =Falsetry:conn = self.get_conn()cursor = conn.cursor()cursor.execute(sql)print("[create table success]")result =Trueexcept Exception as e:logging.error('ERROR: execute {0} causes error {1} in create_table'.format(sql,str(e)))sys.exit('ERROR: create table from database error caused {0}'.format(str(e)))finally:cursor.close()conn.commit()#conn.close()self.close_conn(conn)return result
1.4.2. 删除表
# 删除数据表defdrop_table(self, sql:str):"""删除表:param sql: drop sql语句:return: True表示删除成功"""result =Falsetry:conn = self.get_conn()cursor = conn.cursor()cursor.execute(sql)print("[drop table success]")result =Trueexcept Exception as e:logging.error('ERROR: execute {0} causes error {1} in drop_table'.format(sql,str(e)))sys.exit('ERROR: drop table from database error caused {0}'.format(str(e)))finally:cursor.close()conn.commit()#conn.close()self.close_conn(conn)return result
1.4.3. 插入数据
defexec_insert(self, sql):'''执行插入'''result =Falsetry:conn = self.get_conn()cursor = conn.cursor()cursor.execute(sql)result =Trueexcept Exception as e:logging.error('ERROR: execute {0} causes error {1} in exec_insert'.format(sql,str(e)))sys.exit('ERROR: insert data from database error caused {0}'.format(str(e)))finally:cursor.close()conn.commit()#conn.close()self.close_conn(conn)return resultdefexec_insert_plus(self, table:str, params:dict):'''执行插入'''result =Falsetry:key_tup =tuple(params.keys())key_str =",".join(key_tup)# different with psqlval_str =",".join(("?",)*len(key_tup))sql_str ="insert into "+ table +" ("+ key_str +") values ("+ val_str +")"#val_tup = tuple(params.values())val_tup =()for item in params.values():iftype(item)==list:val_tup +=(json.dumps(item),)eliftype(item)==str:val_tup +=(item,)eliftype(item)==int:val_tup +=(item,)else:val_tup +=(item,)#val_tup.append(str(item))#print("exec_insert_plus", sql_str, val_tup)conn = self.get_conn()cursor = conn.cursor()cursor.execute(sql_str, val_tup)result =Trueexcept Exception as e:logging.error('ERROR: execute {0} causes error {1} in exec_insert_plus'.format(table,str(e)))sys.exit('ERROR: insert data from database error caused {0}'.format(str(e)))finally:cursor.close()conn.commit()#conn.close()self.close_conn(conn)return resultdefexec_insert_many(self, table:str, params: List[Dict]):defdict_to_str(tab:str, param: Dict):key_tup =tuple(param.keys())key_str =",".join(key_tup)# different with psqlval_str =",".join(("?",)*len(key_tup))sql_str ="insert into "+ tab +" ("+ key_str +") values ("+ val_str +")"return sql_strdefdict_to_tuple(param: Dict):val_tup =()for item in param.values():iftype(item)==list:val_tup +=(json.dumps(item),)eliftype(item)==str:val_tup +=(item,)eliftype(item)==int:val_tup +=(item,)else:val_tup +=(item,)#val_tup.append(str(item))return val_tup'''执行插入'''result =Falseiflen(params)<=0:return resulttry:sql_str = dict_to_str(table, params[0])val_lst =[]for param in params:val_lst.append(dict_to_tuple(param))print("exec_insert_many", sql_str, val_lst)conn = self.get_conn()cursor = conn.cursor()cursor.executemany(sql_str, val_lst)result =Trueexcept Exception as e:logging.error('ERROR: execute {0} causes error {1} in exec_insert'.format(sql_str,str(e)))sys.exit('ERROR: insert data from database error caused {0}'.format(str(e)))finally:cursor.close()conn.commit()#conn.close()self.close_conn(conn)return result
1.4.4. 删除数据
# sql = "DELETE from users where user_id='83f7d86b594e4b26a7196ab761afcc7c';"defexec_delete(self, sql):'''执行查询'''result =Falsetry:conn = self.get_conn()cursor = conn.cursor()cursor.execute(sql)result =Trueexcept Exception as e:logging.error('ERROR: execute {0} causes error {1} in exec_delete'.format(sql,str(e)))sys.exit('ERROR: delete data from database error caused {0}'.format(str(e)))finally:cursor.close()conn.commit()#conn.close()self.close_conn(conn)return result
1.4.5. 更新数据
# 修改单个值# update tasks set status='running' where task_id='0791216839b04d5c88846817f78280cc';# 修改多个值# update tasks set status='running',score='10' where task_id='0791216839b04d5c88846817f78280cc';defexec_update(self, sql):'''执行更新'''result =Falsetry:conn = self.get_conn()cursor = conn.cursor()cursor.execute(sql)result =Trueexcept Exception as e:logging.error('ERROR: execute {0} causes error {1} in exec_update'.format(sql,str(e)))sys.exit('ERROR: update data from database error caused {0}'.format(str(e)))finally:cursor.close()conn.commit()#conn.close()self.close_conn(conn)return result
1.4.6. 查询数据
# select * from users where user_name='hello';defexec_select(self, sql):'''执行查询'''try:conn = self.get_conn()cursor = conn.cursor()cursor.execute(sql)result = cursor.fetchall()#result = cursor.fetchone()except Exception as e:logging.error('ERROR: execute {0} causes error {1} in exec_select'.format(sql,str(e)))sys.exit('ERROR: load data from database error caused {0}'.format(str(e)))finally:cursor.close()#conn.close()#print("exec_select", len(self._pool), self._pool)print("init_pool", self._maxconns,len(self._pool), self._pool)self.close_conn(conn)return result
1.4.7. 测试
deftest_select(self, sql):result = self.exec_select(sql)print("test_select", result)return result
2. 操作使用实例
# for testfrom typing import Optional, List, Dict, Union
from pydantic import BaseModel, Fieldclass......if __name__ =='__main__':dbhost =""dbdatabase ="./test.db"db = SqliteTool(dbhost, dbdatabase)classTaskInDB(BaseModel):task_id:strdisabled:intdefcreate_tests_table(db):sql_str ="create table if not exists tests("sql_str +="task_id char(32) primary key,"sql_str +="disabled int not null"sql_str +=");"return db.create_table(sql_str)defdrop_tests_table(db):sql_str ="drop table if exists tests;"return db.drop_table(sql_str)defget_tests_indb(db, tasks: Union[List[str],str,None]=None):if tasks ==None:sql_str ="select * from tests;"iftype(tasks)==list:print("list", tasks)key_str =",".join(tasks)sql_str ="select * from tests where task_id in ("+ key_str +");"eliftype(tasks)==str:print("str", tasks)sql_str ="select * from tests where task_id ='"+tasks+"';"eliftype(tasks)==None:print("none")ret_tasks = db.exec_select(sql_str)return ret_tasksdefcreate_tests_indb(db, tasks: List[TaskInDB]):#return db.exec_insert_plus("tests", task_indb.model_dump())#return db.exec_insert_plus("tests", task_indb.dict())params =[]for task in tasks:print("create_tests_indb", task)#params.append(task.model_dump())params.append(task.dict())return db.exec_insert_many("tests", params)# 重建数据库ifnot drop_tests_table(db)ornot create_tests_table(db):print("ERROR")# 创建两条记录task_indb1 = TaskInDB(task_id="11111111", disabled=1)#create_tests_indb(db, task_indb)task_indb2 = TaskInDB(task_id="22222222", disabled=0)#create_tests_indb(db, task_indb)task_indb3 = TaskInDB(task_id="33333333", disabled=0)#create_tests_indb(db, task_indb)create_tests_indb(db,[task_indb1, task_indb2, task_indb3])# 查询记录#key_tup = tuple(TaskInDB.model_fields.keys())key_tup =tuple(TaskInDB.__fields__.keys())#key_str = ",".join(key_tup)# allret_tasks = get_tests_indb(db)# str#ret_tasks = get_tests_indb(db, tasks="11111111")# list#ret_tasks = get_tests_indb(db, tasks=["11111111", "22222222"])for ret_task in ret_tasks:print(ret_task)task_indb = TaskInDB(**{key: ret_task[i]for i,key inenumerate(key_tup)})print(task_indb)print("OK")