当前位置: 首页 > news >正文

Python连接Mysql、Postgre、ClickHouse、Redis常用库及封装方法

博主在这里分享一些常见的python连接数据库或中间件的库和封装方案,希望对大家有用。

Mysql封装

#!/usr/bin/python
# -*- coding: utf-8 -*-
import sys
import pymysql
from settings import MYSQL_DB, MYSQL_PORT, MYSQL_USER, MYSQL_PASSWORD, MYSQL_HOST, ENV
# 这个自己在配置文件写好,就不暴露出来了class MysqlClient(object):def __init__(self, host, port, user, password, db):"""初始化连接"""self.conn = Noneself.cursor = Noneself.host = hostself.port = portself.user = userself.password = passwordself.db = dbself.conn = pymysql.Connect(host=self.host, user=self.user, password=self.password,database=self.db, port=self.port)self.cursor = self.conn.cursor(pymysql.cursors.DictCursor)self.cur = self.conn.cursor()  # 区别是上面那个返回的记录是字典,这个是元组def __del__(self):if self.cursor is not None:self.cursor.close()if self.cur is not None:self.cur.close()if self.conn is not None:self.conn.close()if ENV != 'dev' and self.server is not None:self.server.close()def query_record(self, query_sql=None):"""默认查询接口, 获取所有数据:param query_sql::return: List"""self.cursor.execute(query_sql)self.conn.commit()return self.cursor.fetchall()def insert_record(self, insert_sql, res):"""默认写入所有数据:param insert_sql::param res::return:"""if ENV != 'dev':print("不允许执行", ENV)sys.exit(-1)self.cursor.executemany(insert_sql, res)self.conn.commit()def delete_record(self, delete_sql):"""删除数据:param delete_sql::return:"""if ENV != 'dev':print("不允许执行", ENV)sys.exit(-1)self.cursor.execute(delete_sql)self.conn.commit()def update_record(self, update_sql, res_tuple):"""更新数据:param update_sql::param res_tuple::return:"""if ENV != 'dev':print("不允许执行", ENV)sys.exit(-1)self.cursor.execute(update_sql % res_tuple)self.conn.commit()# 查询一条数据,并返回表头def search_one_with_header(self, sql):self.cursor.execute(sql)result = self.cursor.fetchone()self.conn.commit()return resultdef search_all_with_header(self, sql):# 暂不使用self.cursor,因为它返回的是字典格式self.cur.execute(sql)result = self.cur.fetchall()self.conn.commit() data_dict = []for field in self.cur.description:data_dict.append(field[0])return result, data_dictif __name__ == "__main__":sql = 'select * from user_info limit 2'mysql_client = MysqlClient(MYSQL_HOST, MYSQL_PORT, MYSQL_USER, MYSQL_PASSWORD, MYSQL_DB)res = mysql_client.query_record(sql)for r in res:print(r)

Postgre封装

import psycopg2
import sys
from psycopg2 import extras
from settings import PG_DB, PG_PORT, PG_USER, PG_PASSWORD, PG_HOST, ENVclass PgClient(object):def __init__(self, host, port, user, password, db):"""初始化连接"""self.conn = Noneself.host = hostself.port = portself.user = userself.password = passwordself.db = dbself.conn = psycopg2.connect(host=self.host, user=self.user, password=self.password,database=self.db, port=self.port)self.cursor = self.conn.cursor(cursor_factory=extras.DictCursor) # 字典形式self.cur = self.conn.cursor()  # 元组格式def __del__(self):if self.cursor is not None:self.cursor.close()if self.cur is not None:self.cur.close()if self.conn is not None:self.conn.close()if ENV != 'dev' and self.server is not None:try:self.server.close()except Exception as e:passdef query_record(self, query_sql=None):"""默认查询接口, 获取所有数据:param query_sql::return: List"""self.cursor.execute(query_sql)self.conn.commit()  # 防止轮询时,查到的结果一直未更新return self.cursor.fetchall()def insert_record(self, insert_sql, res):"""默认写入所有数据:param insert_sql::param res::return:"""if ENV != 'dev':print("不允许执行", ENV)sys.exit(-1)self.cursor.executemany(insert_sql, res)self.conn.commit()def delete_record(self, delete_sql):"""删除数据:param delete_sql::return:"""if ENV != 'dev':print("不允许执行", ENV)sys.exit(-1)self.cursor.execute(delete_sql)self.conn.commit()def update_record(self, update_sql, res_tuple):"""更新数据:param update_sql::param res_tuple::return:"""if ENV != 'dev':print("不允许执行", ENV)sys.exit(-1)self.cursor.execute(update_sql % res_tuple)self.conn.commit()pg_ofclient = PgClient(PG_HOST, PG_PORT, PG_USER, PG_PASSWORD, PG_DB)
sql = "select * from user_info limit 2"
rows = pg_ofclient.query_record(sql)
for row in rows:print(row)

ClickHouse封装

#!/usr/bin/python
# -*- coding: utf-8 -*-
from clickhouse_driver import connect
from settings import CK_HOST, CK_PORT, CK_DB, CK_USER, CK_PW, ENVclass CkClient(object):def __init__(self, host, port, user, password, db):"""初始化连接"""self.conn = Noneself.host = hostself.port = portself.user = userself.password = passwordself.db = dbself.conn = connect(host=self.host, user=self.user, password=self.password,database=self.db, port=self.port)self.cur = self.conn.cursor()  def __del__(self):if self.cur is not None:self.cur.close()if self.conn is not None:self.conn.close()if ENV != 'dev' and self.server is not None:try:self.server.close()except Exception as e:passdef query_record(self, query_sql=None):"""默认查询接口, 获取所有数据:param query_sql::return: List"""self.cur.execute(query_sql)columns = [desc[0] for desc in self.cur.description]result = [dict(zip(columns, row)) for row in self.cur.fetchall()]self.conn.commit() return resultdef search_all_with_header(self, sql):self.cur.execute(sql)result = self.cur.fetchall()self.conn.commit()data_dict = []for field in self.cur.description:data_dict.append(field[0])return result, data_dictif __name__ == "__main__":ck_client = CkClient(CK_HOST, CK_PORT, CK_USER, CK_PW, CK_DB)sql = "select * from user_info limit 2"rows = ck_client.query_record(sql)print(rows)

Redis封装

#!/usr/bin/python
# -*- coding: utf-8 -*-
import redis
import sys
import json
import time
from settings import redis_host, redis_port, redis_pwclass RedisClient:def __init__(self, host=redis_host, port=redis_port, password=redis_pw, db=5):self.host = hostself.port = portself.password = passwordself.db = dbself.connection = redis.Redis(host=self.host, port=self.port, password=self.password, db=self.db)def get(self, key):return self.connection.get(key)def hget_all(self, hash_name):return self.connection.hgetall(hash_name)def delete(self, key):if ENV != 'dev':print("不允许执行", ENV)sys.exit(-1)self.connection.delete(key)def set(self, key, value, expiration=None):if ENV != 'dev':print("不允许执行", ENV)sys.exit(-1)self.connection.set(key, value, ex=expiration)def set_json(self, key, value, expiration=None):json_value = json.dumps(value)  # 将 JSON 对象转换为字符串self.connection.set(key, json_value, ex=expiration)def get_json(self, key):json_value = self.connection.get(key)if json_value is not None:value = json.loads(json_value)  # 将字符串转换为 JSON 对象return valuereturn None
if __name__ == '__main__':# 使用示例redis_client = RedisClient()key = 'XXXX:XX:XX'while True:d = redis_client.get(key=key)print(d)time.sleep(0.1)
http://www.lryc.cn/news/484904.html

相关文章:

  • 如何修改npm包
  • Django 2024全栈开发指南(三):数据库模型与ORM操作(上篇)
  • 低代码可视化-uniapp开关选择组件-低码生成器
  • 【arxiv‘24】Vision-Language Navigation with Continual Learning
  • 如何在 Ubuntu 上安装 Jupyter Notebook
  • 免费申请 Let‘s Encrypt SSL 证书
  • 【JAVA】Java基础—面向对象编程:继承—重写父类方法
  • 【C++初阶】C++入门
  • 自然推理系统:的拒取式的解析
  • OceanBase 分区表详解
  • Java中 LinkedList<>,ArrayDeque<>的区别 || Queue和Deque的区别
  • freemarker 读取template.xml ,通过response 输出文件,解决中文乱码问题
  • arkUI:水果选择与管理:基于 ArkUI 的长按编辑功能实现
  • docker使用,docker图形化界面+docker详细命令
  • idea项目运行时 java: 错误: 不支持发行版本 21
  • hive alter table add columns 是否使用 cascade 的方案
  • 手机怎么玩steam游戏?随时随地远程串流玩steam游戏教程
  • 【使用antv g6实现拓扑图】
  • 【数学 函数空间】拉普拉斯变换解微分方程步骤
  • vue3: toRef, reactive, toRefs, toRaw
  • Unity读取Json
  • 基于STM32的智能语音识别饮水机系统设计
  • c++的几种构造函数
  • FRP 实现内网穿透
  • 数据结构笔记(其八)--一般树的存储及其遍历
  • 在spring boot工程中使用Filter时,@WebFilter 注解不生效的问题分析和解决方案
  • 浅谈“通感一体”
  • 【Linux】监控系统Zabbix的安装与配置
  • Springboot定时任务
  • node.js知识点总结