当前位置: 首页 > news >正文

FastAPI + SQLAlchemy (异步版)连接数据库时,对数据进行加密

简介:此部分内容为,在FastAPI + SQLAlchemy (异步版)连接数据库时,需要对保存在数据库中的API_KEY 以及用户密码进行加密时所著:

一、AES-GCM对称加密步骤

1.特点 :

  • 单一密钥:加密和解密使用相同的密钥(对称加密),需通过安全方式(如环境变量、密钥管理系统)存储和传输密钥。
  • 密钥长度:支持 128、192、256 位密钥(推荐 256 位以获得最高安全性)。

2.加解密步骤: 

1.生成密钥:

密钥一般存在环境变量中,使用安全的随机数生成器(如 Python 的 os.urandom):

generate_aes_gcm_key.py:

import os
import base64# 1. 生成 AES-256 密钥
def generate_aes_gcm_key(key_size: int = 32) -> bytes:"""生成指定长度的 AES-GCM 密钥(字节串)。:param key_size: 16(AES-128)、24(AES-192)、32(AES-256):return: 密钥(bytes)"""if key_size not in (16, 24, 32):raise ValueError("Key size must be 16, 24, or 32 bytes")return os.urandom(key_size)# 2. 将密钥转换为 Base64 字符串(便于存储到环境变量)
def b64encode_generated_key(generated_key: bytes) ->str:key_base64 = base64.b64encode(generated_key).decode()# print("Base64 编码的密钥:", key_base64)return key_base64# 只运行一次,确保全流程中密钥统一
if __name__ == "__main__":generated_key = generate_aes_gcm_key()key_base64 = b64encode_generated_key(generated_key)print("编码后的密钥:",key_base64) # 需要手动保存到环境变量中(.env)

2.将generate_aes_gcm_key.py中生成的密钥 AES_KEY 手动复制添加到 .env文件中:

3.在 config.py 文件中加载环境变量

from pydantic_settings import BaseSettings
from dotenv import load_dotenv
import os# 加载环境变量(仅本地开发)
load_dotenv()class DifySetting(BaseSettings):MYSQL_HOST: strMYSQL_PORT: int = 3306MYSQL_USER: strMYSQL_PASSWORD: strMYSQL_NAME: strAPP_ENV: str = "dev"AES_KEY: str  # AES 密钥(Base64 编码)AES_GCM_NONCE_SIZE: int = 12  # 注意,环境变量中存在的值,在DifySetting这个类中也必须包含class Config:env_file =".env"env_file_encoding = "utf-8"# 全局 AES-GCM 实例
dify_settings = DifySetting()

4.编写 aes_gcm_security.py 加密、解密函数:

import base64
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from app.core.config import dify_settings  # 假设这是你的配置模块(包含 AES_KEY)
import os
from cryptography.exceptions import InvalidTag# 1. 初始化 AESGCM 对象
def load_aes_key() -> AESGCM:key_bytes = base64.b64decode(dify_settings.AES_KEY)  # Base64 → bytesreturn AESGCM(key_bytes)  # 创建 AESGCM 实例aesgcm = load_aes_key()# 2. 加密函数 - 返回单个组合字符串
def encrypt_aes_gcm_combined(plaintext: str) -> str:"""使用 AES-GCM 加密明文字符串,返回组合字符串(IV+密文的Base64编码):param plaintext: 明文(字符串):return: Base64编码的字符串(前16字符为IV,后面为密文)"""plaintext_bytes = plaintext.encode("utf-8")iv = os.urandom(12)  # 生成12字节随机IVciphertext = aesgcm.encrypt(iv, plaintext_bytes, None)# 拼接IV和密文后整体进行Base64编码combined = iv + ciphertextreturn base64.b64encode(combined).decode("utf-8")# 3. 解密函数 - 从组合字符串解密
def decrypt_aes_gcm_combined(combined_base64: str) -> str:"""从组合字符串解密出原始明文:param combined_base64: Base64编码的组合字符串(IV+密文):return: 明文(字符串)"""try:combined = base64.b64decode(combined_base64)iv = combined[:12]  # 前12字节为IVciphertext = combined[12:]  # 剩余部分为密文plaintext_bytes = aesgcm.decrypt(iv, ciphertext, None)return plaintext_bytes.decode("utf-8")except InvalidTag:raise ValueError("解密失败:认证标签无效(密钥或数据损坏)")except Exception as e:raise ValueError(f"解密失败:{str(e)}")

 5. 在编写 FastAPI 时调用加密、解密函数;

from fastapi import APIRouter, HTTPException, status
from pydantic import BaseModel
from datetime import datetime
from sqlalchemy import select, exc
from app.database.database import db_dependency
from app.models.dify_models_ORM import Agent
# 导入优化后的加密函数
from app.core.aes_gcm_security import encrypt_aes_gcm_combined, decrypt_aes_gcm_combineddify_router = APIRouter()# 定义 Pydantic 模型
class AgentResponse(BaseModel):id: intagent_name: stragent_describe: stragent_url: stragent_Content_Type: stragent_api_key: str  # 返回解密后的API Keyuser: strcreated_at: datetimeclass Config:from_attributes = Trueclass CreateAgentRequest(BaseModel):agent_name: stragent_describe: strurl: strapi_key: str  # 接收明文API Keycontent_type: str = "application/json"user: str# 创建 Agent - 使用优化后的加密方法
@dify_router.post("/dify_agents", status_code=status.HTTP_201_CREATED)
async def create_agent(request: CreateAgentRequest, db: db_dependency):try:# 使用新的组合加密方法combined_ciphertext = encrypt_aes_gcm_combined(request.api_key)db_agent = Agent(agent_name=request.agent_name,agent_describe=request.agent_describe,agent_url=request.url,agent_api_key=combined_ciphertext,  # 存储组合密文agent_Content_Type=request.content_type,user=request.user,created_at=datetime.utcnow())db.add(db_agent)await db.commit()await db.refresh(db_agent)return {"agent_id": db_agent.id}except exc.IntegrityError:await db.rollback()raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="API Key 已存在")except Exception as e:await db.rollback()raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))# 查询单个 Agent - 使用优化后的解密方法
@dify_router.get("/dify_agents/{agent_id}", status_code=status.HTTP_200_OK)
async def read_agent(agent_id: int, db: db_dependency):try:result = await db.execute(select(Agent).where(Agent.id == agent_id))agent = result.scalars().first()if not agent:raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Agent 未找到")# 解密逻辑 - 使用新的组合解密方法try:decrypted_key = decrypt_aes_gcm_combined(agent.agent_api_key)# 创建代理对象副本,避免直接修改ORM对象,即返回为此副本中数据agent_data = {"id": agent.id,"agent_name": agent.agent_name,"agent_describe": agent.agent_describe,"agent_url": agent.agent_url,"agent_Content_Type": agent.agent_Content_Type,"agent_api_key": decrypted_key,  # 使用解密后的密钥"user": agent.user,"created_at": agent.created_at}return AgentResponse(**agent_data)except Exception as e:raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,detail=f"解密失败: {str(e)}")except Exception as e:raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))# 查询所有 Agents - 不返回敏感API Key
@dify_router.get("/dify_agents", status_code=status.HTTP_200_OK)
async def read_agents(db: db_dependency):try:result = await db.execute(select(Agent))agents = result.scalars().all()# 返回不包含敏感API Key的数据safe_agents = []for agent in agents:safe_agents.append({"id": agent.id,"agent_name": agent.agent_name,"agent_describe": agent.agent_describe,"agent_url": agent.agent_url,"agent_Content_Type": agent.agent_Content_Type,"user": agent.user,"created_at": agent.created_at})return safe_agentsexcept Exception as e:raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))# 删除 Agent - 保持不变
@dify_router.delete("/dify_agents/{agent_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_agent(agent_id: int, db: db_dependency):try:result = await db.execute(select(Agent).where(Agent.id == agent_id))agent = result.scalar()if not agent:raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Agent 未找到")await db.delete(agent)await db.commit()except Exception as e:await db.rollback()raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))return None

二、哈希算法加密用户密码:

1.特点:

  • 密码哈希算法是​​单向的​​(不可逆),专门为存储密码(等无需还原原来明文信息所设计);
  • 抗暴力破解​​:通过加盐(Salt)和多次迭代(Work Factor)增加计算成本;

2.加密步骤: 

http://www.lryc.cn/news/584792.html

相关文章:

  • React Three Fiber 实现 3D 模型点击高亮交互的核心技巧
  • Gin 中常见参数解析方法
  • 用TensorFlow进行逻辑回归(二)
  • 闲庭信步使用图像验证平台加速FPGA的开发:第九课——图像插值的FPGA实现
  • 硬件加速(FPGA)
  • BigFoot Decursive 2.7.28 2025.07.11
  • MyBatis插件机制揭秘:从拦截器开发到分页插件实战
  • 深入剖析 ADL:C++ 中的依赖查找机制及其编译错误案例分析
  • Linux面试问题-软件测试
  • RISC-V:开源芯浪潮下的技术突围与职业新赛道 (二) RISC-V架构深度解剖(上)
  • idea如何打开extract surround
  • 【C++】——类和对象(上)
  • Linux指令与权限
  • Navicat实现MySQL数据传输与同步完整指南
  • python正则表达式(小白五分钟从入门到精通)
  • Vue 中监测路由变化时,通常不需要开启深度监听(deep: true)
  • Spring事务管理深度解析:原理、实践与陷阱
  • STM32-ADC
  • squash压缩合并
  • 计算机视觉速成 之 概述
  • 【学习笔记】机器学习(Machine Learning) | 第七章|神经网络(2)
  • Linux:库的原理
  • (C++)任务管理系统(文件存储)(正式版)(迭代器)(list列表基础教程)(STL基础知识)
  • 【算法笔记 day three】滑动窗口(其他类型)
  • 使用球体模型模拟相机成像:地面与天空的可见性判断与纹理映射
  • STM32第十九天 ESP8266-01S和电脑实现串口通信(2)
  • Linux 日志分析核心命令速查表
  • UE5源码模块解析与架构学习
  • 【第二节】ubuntu server配置静态IP
  • 蜻蜓I即时通讯系统重构宣言:破茧重生的技术革命-长痛不如短痛卓伊凡|麻子|果果