FastAPI + SQLAlchemy (异步版)连接数据库时,对数据进行加密
简介:此部分内容为,在FastAPI + SQLAlchemy (异步版)连接数据库时,需要对保存在数据库中的API_KEY 以及用户密码进行加密时所著:
一、AES-GCM对称加密步骤
1.特点 :
- 单一密钥:加密和解密使用相同的密钥(对称加密),需通过安全方式(如环境变量、密钥管理系统)存储和传输密钥。
- 密钥长度:支持 128、192、256 位密钥(推荐 256 位以获得最高安全性)。
2.加解密步骤:
1.生成密钥:
密钥一般存在环境变量中,使用安全的随机数生成器(如 Python 的 os.urandom
):
generate_aes_gcm_key.py:
import os
import base64# 1. 生成 AES-256 密钥
def generate_aes_gcm_key(key_size: int = 32) -> bytes:"""生成指定长度的 AES-GCM 密钥(字节串)。:param key_size: 16(AES-128)、24(AES-192)、32(AES-256):return: 密钥(bytes)"""if key_size not in (16, 24, 32):raise ValueError("Key size must be 16, 24, or 32 bytes")return os.urandom(key_size)# 2. 将密钥转换为 Base64 字符串(便于存储到环境变量)
def b64encode_generated_key(generated_key: bytes) ->str:key_base64 = base64.b64encode(generated_key).decode()# print("Base64 编码的密钥:", key_base64)return key_base64# 只运行一次,确保全流程中密钥统一
if __name__ == "__main__":generated_key = generate_aes_gcm_key()key_base64 = b64encode_generated_key(generated_key)print("编码后的密钥:",key_base64) # 需要手动保存到环境变量中(.env)
2.将generate_aes_gcm_key.py中生成的密钥 AES_KEY 手动复制添加到 .env文件中:
3.在 config.py 文件中加载环境变量
from pydantic_settings import BaseSettings
from dotenv import load_dotenv
import os# 加载环境变量(仅本地开发)
load_dotenv()class DifySetting(BaseSettings):MYSQL_HOST: strMYSQL_PORT: int = 3306MYSQL_USER: strMYSQL_PASSWORD: strMYSQL_NAME: strAPP_ENV: str = "dev"AES_KEY: str # AES 密钥(Base64 编码)AES_GCM_NONCE_SIZE: int = 12 # 注意,环境变量中存在的值,在DifySetting这个类中也必须包含class Config:env_file =".env"env_file_encoding = "utf-8"# 全局 AES-GCM 实例
dify_settings = DifySetting()
4.编写 aes_gcm_security.py 加密、解密函数:
import base64
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from app.core.config import dify_settings # 假设这是你的配置模块(包含 AES_KEY)
import os
from cryptography.exceptions import InvalidTag# 1. 初始化 AESGCM 对象
def load_aes_key() -> AESGCM:key_bytes = base64.b64decode(dify_settings.AES_KEY) # Base64 → bytesreturn AESGCM(key_bytes) # 创建 AESGCM 实例aesgcm = load_aes_key()# 2. 加密函数 - 返回单个组合字符串
def encrypt_aes_gcm_combined(plaintext: str) -> str:"""使用 AES-GCM 加密明文字符串,返回组合字符串(IV+密文的Base64编码):param plaintext: 明文(字符串):return: Base64编码的字符串(前16字符为IV,后面为密文)"""plaintext_bytes = plaintext.encode("utf-8")iv = os.urandom(12) # 生成12字节随机IVciphertext = aesgcm.encrypt(iv, plaintext_bytes, None)# 拼接IV和密文后整体进行Base64编码combined = iv + ciphertextreturn base64.b64encode(combined).decode("utf-8")# 3. 解密函数 - 从组合字符串解密
def decrypt_aes_gcm_combined(combined_base64: str) -> str:"""从组合字符串解密出原始明文:param combined_base64: Base64编码的组合字符串(IV+密文):return: 明文(字符串)"""try:combined = base64.b64decode(combined_base64)iv = combined[:12] # 前12字节为IVciphertext = combined[12:] # 剩余部分为密文plaintext_bytes = aesgcm.decrypt(iv, ciphertext, None)return plaintext_bytes.decode("utf-8")except InvalidTag:raise ValueError("解密失败:认证标签无效(密钥或数据损坏)")except Exception as e:raise ValueError(f"解密失败:{str(e)}")
5. 在编写 FastAPI 时调用加密、解密函数;
from fastapi import APIRouter, HTTPException, status
from pydantic import BaseModel
from datetime import datetime
from sqlalchemy import select, exc
from app.database.database import db_dependency
from app.models.dify_models_ORM import Agent
# 导入优化后的加密函数
from app.core.aes_gcm_security import encrypt_aes_gcm_combined, decrypt_aes_gcm_combineddify_router = APIRouter()# 定义 Pydantic 模型
class AgentResponse(BaseModel):id: intagent_name: stragent_describe: stragent_url: stragent_Content_Type: stragent_api_key: str # 返回解密后的API Keyuser: strcreated_at: datetimeclass Config:from_attributes = Trueclass CreateAgentRequest(BaseModel):agent_name: stragent_describe: strurl: strapi_key: str # 接收明文API Keycontent_type: str = "application/json"user: str# 创建 Agent - 使用优化后的加密方法
@dify_router.post("/dify_agents", status_code=status.HTTP_201_CREATED)
async def create_agent(request: CreateAgentRequest, db: db_dependency):try:# 使用新的组合加密方法combined_ciphertext = encrypt_aes_gcm_combined(request.api_key)db_agent = Agent(agent_name=request.agent_name,agent_describe=request.agent_describe,agent_url=request.url,agent_api_key=combined_ciphertext, # 存储组合密文agent_Content_Type=request.content_type,user=request.user,created_at=datetime.utcnow())db.add(db_agent)await db.commit()await db.refresh(db_agent)return {"agent_id": db_agent.id}except exc.IntegrityError:await db.rollback()raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="API Key 已存在")except Exception as e:await db.rollback()raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))# 查询单个 Agent - 使用优化后的解密方法
@dify_router.get("/dify_agents/{agent_id}", status_code=status.HTTP_200_OK)
async def read_agent(agent_id: int, db: db_dependency):try:result = await db.execute(select(Agent).where(Agent.id == agent_id))agent = result.scalars().first()if not agent:raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Agent 未找到")# 解密逻辑 - 使用新的组合解密方法try:decrypted_key = decrypt_aes_gcm_combined(agent.agent_api_key)# 创建代理对象副本,避免直接修改ORM对象,即返回为此副本中数据agent_data = {"id": agent.id,"agent_name": agent.agent_name,"agent_describe": agent.agent_describe,"agent_url": agent.agent_url,"agent_Content_Type": agent.agent_Content_Type,"agent_api_key": decrypted_key, # 使用解密后的密钥"user": agent.user,"created_at": agent.created_at}return AgentResponse(**agent_data)except Exception as e:raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,detail=f"解密失败: {str(e)}")except Exception as e:raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))# 查询所有 Agents - 不返回敏感API Key
@dify_router.get("/dify_agents", status_code=status.HTTP_200_OK)
async def read_agents(db: db_dependency):try:result = await db.execute(select(Agent))agents = result.scalars().all()# 返回不包含敏感API Key的数据safe_agents = []for agent in agents:safe_agents.append({"id": agent.id,"agent_name": agent.agent_name,"agent_describe": agent.agent_describe,"agent_url": agent.agent_url,"agent_Content_Type": agent.agent_Content_Type,"user": agent.user,"created_at": agent.created_at})return safe_agentsexcept Exception as e:raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))# 删除 Agent - 保持不变
@dify_router.delete("/dify_agents/{agent_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_agent(agent_id: int, db: db_dependency):try:result = await db.execute(select(Agent).where(Agent.id == agent_id))agent = result.scalar()if not agent:raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Agent 未找到")await db.delete(agent)await db.commit()except Exception as e:await db.rollback()raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))return None
二、哈希算法加密用户密码:
1.特点:
- 密码哈希算法是单向的(不可逆),专门为存储密码(等无需还原原来明文信息所设计);
- 抗暴力破解:通过加盐(Salt)和多次迭代(Work Factor)增加计算成本;