当前位置: 首页 > news >正文

从零实现一个基于 mem0的具有长期记忆的Text2SQL代理

长期记忆将普通的 text2SQL 代理转换为智能助手,可以跨会话记住用户偏好。本教程介绍如何构建受 Mem0 架构研究论文启发的自定义实现。

重要说明

从头开始开发了遵循 Mem0 研究论文中描述的核心原则的代码。这为我们提供了更大的灵活性来专门针对 text2SQL 应用程序调整架构。

为什么长期记忆对 Text2SQL 很重要

在传统的 text2SQL 系统中,会话在每个会话中从头开始。首选项、术语和上下文消失,迫使用户反复重新教育他们的 AI 助手。

这会产生很大的认知负担:金融分析师必须每天早上指定“仅显示已批准的贷款”,物业经理必须在每次会议中重新定义“豪华物业”,后续问题需要不断刷新上下文。

长期记忆通过使您的 text2SQL 代理能够:

  • 记住会话中的首选项,按用户特定标准(如“排除已取消的订单”)自动筛选
  • 学习连接用户语言和数据库架构的域术语,例如 “quarterly performers” = “accounts with activity in last three months”
  • 随着时间的推移保持上下文,允许在几天后自然地跟进,例如“其中有多少来自加利福尼亚?
  • 构建对每个用户的数据兴趣的个性化理解,无需明确说明即可提高相关性

此功能不仅增强了体验,还从根本上改变了人们与数据交互的方式,为复杂的数据库信息创建了一个更直观、更人性化的界面。

什么是 Mem0 以及它如何激发我们的多用户 Text2SQL 架构

Mem0(发音为“mem-zero”)是 mem0.ai 的一种内存架构,它通过两阶段系统解决有限的上下文窗口问题:从对话中提取关键信息并更新内存以保持一致性。

此体系结构通过多用户内存隔离增强了 Mem0:

  1. 完全内存隔离:每个用户的首选项单独存储,防止交叉污染。
  2. 用户目标提取:此 Text2SQL 内存组件可识别每个用户的 SQL 相关元素(实体、首选项、术语、量度)。
  3. 数据库集成:将个性化内存直接连接到数据库架构,以实现准确的 SQL 生成。

此架构创建了个性化的数据库界面,其中金融分析师可以自动按首选指标进行筛选,物业经理使用专业术语,高管可以参考自定义 KPI — 所有这些都无需在每次会话中重申偏好。该系统为每个用户的交互模式构建了一个持久的心智模型,以适合个人需求的方式将自然语言和 SQL 连接起来。

此 text2sql + 长期内存架构的核心组件

如图所示,这种 text2sql 长期内存架构由两个协同工作的主要阶段组成:

None

1. 提取阶段(蓝色)

提取阶段从对话中捕获重要信息:

  • 消息摄取:系统处理新的消息对(用户问题 + AI 响应)。
  • 上下文检索:利用两个上下文来源:
  • 封装整个历史记录的对话摘要
  • 用于即时上下文的最新 m 条消息
  • LLM 处理:LLM 分析对话上下文以提取相关信息。
2. Text2SQL 用户特定内存提取(红色)

此专用组件将一般对话元素转换为结构化的数据库相关记忆:

  • 实体提取:标识用户经常引用的数据库实体(表、字段)
  • 首选项捕获:特定于每个用户的记录筛选和排序首选项
  • 术语识别:将用户定义的术语映射到其数据库等效项
  • Metric Definition:存储用户定义的自定义计算或标准
3. 更新阶段(绿色)

更新阶段维护一致的知识库:

  • 相似性搜索:系统使用向量数据库查找相似的现有记忆
  • 作分类:系统决定是否:
  • ADD:在没有类似回忆存在时创建新的回忆
  • 更新:用新信息增强现有记忆
  • DELETE:删除过时或矛盾的记忆
  • NOOP:当信息冗余时不做任何更改

所有作都以顶部的 Vector Database 为中心,该数据库通过适当的用户隔离来存储内存,从而在保持用户之间隐私的同时实现个性化响应。

该架构可确保仅存储相关的非冗余信息,同时随着时间的推移保持对话的连贯视图。最重要的是,通过显式建模用户特定的 SQL 相关元素(如实体和术语),系统可以将自然语言转换为 SQL,比通用 text2SQL 方法更准确、更个性化。

实现多用户内存存储

我们的 text2SQL 内存系统的核心是其基于 PostgreSQL 的存储实现,用于生产部署。

此实现利用 pgvector 扩展在高维嵌入向量中进行高效的相似性搜索,处理三个关键方面:

  1. 基于向量的相似性搜索:将嵌入存储为原生向量类型,直接在数据库中执行余弦相似度运算,而无需传输大型向量数据。
  2. 用户隔离user_id 字段既用作安全边界,又用作性能优化,确保一个用户的内存不会影响另一个用户的内存。
  3. 分层索引:HNSW(分层可导航小世界)索引通过分层图形结构加速相似性搜索,支持对数时间近似最近邻搜索,这对于数千个内存条目的实时性能至关重要。

PostgreSQL 内存存储

class PostgresMemoryStore:"""PostgreSQL-based memory storage with user isolation"""def __init__(self, connection_string: str):self.conn_string = connection_stringself.embedding_dim = 1536  # OpenAI embedding dimensionself._init_db()def _init_db(self):"""Initialize database tables if they don't exist"""try:import psycopg2# Connect to PostgreSQLconn = psycopg2.connect(self.conn_string)cursor = conn.cursor()# Check if tables existcursor.execute("SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'memories')")table_exists = cursor.fetchone()[0]# Ensure vector extension is loadedcursor.execute("CREATE EXTENSION IF NOT EXISTS vector;")conn.commit()if not table_exists:# Create memories table with user_id fieldcursor.execute("""CREATE TABLE IF NOT EXISTS memories (id SERIAL PRIMARY KEY,user_id TEXT NOT NULL,content TEXT NOT NULL,created_at FLOAT NOT NULL,source TEXT,metadata JSONB DEFAULT '{}'::JSONB,embedding vector(1536));""")# Create index for vector similarity searchcursor.execute("""CREATE INDEX memories_embedding_idx ON memories USING hnsw (embedding vector_cosine_ops);""")

数据库架构维护三个核心表:

  • memories:使用嵌入来存储实际的内存内容
  • conversation_summaries:维护对话历史记录的压缩表示形式
  • recent_messages:缓存最近的交互以获取即时上下文

每个表都通过对 user_id 列进行索引来强制实施用户隔离,从而允许在不进行表扫描的情况下进行高效的每用户作。

连接到数据平台

用于语义模型提取的 API 客户端

此 API 客户端充当我们的内存系统和底层数据平台之间的灵活连接器。尽管我们的实现在此示例中使用了 Denodo,但该架构设计为与平台无关,并且可以轻松适应任何数据库系统:

  • Snowflake:使用 Snowflake 的信息架构视图 (INFORMATION_SCHEMA.TABLES INFORMATION_SCHEMA.COLUMNS 构建全面的架构理解,包括表关系和列描述。
  • Databricks:通过 Unity Catalog API 或 DESCRIBE EXTENDED 命令访问目录元数据,以捕获表结构、列详细信息和可用筛选器。
  • **SQL Server/Oracle/PostgreSQL/MySQL:**通过各自的系统目录生成语义模型,其中包括有关数据库、表、列、描述和可用筛选器的元数据。
class DenodoAPIClient:"""Client for interacting with Denodo data virtualization platform"""def __init__(self, api_host: str):self.api_host = api_hostdef _get_basic_auth_header(self, username: str, password: str) -> str:"""Create Basic Authentication header"""auth_string = f"{username}:{password}"auth_bytes = auth_string.encode("utf-8")auth_b64 = base64.b64encode(auth_bytes).decode("utf-8")return f"Basic {auth_b64}"async def get_metadata(self, database_name: str, username: str, password: str) -> Dict[str, Any]:"""Get metadata from data platform"""try:url = f"{self.api_host}/getMetadata"headers = {"Content-Type": "application/json","Authorization": self._get_basic_auth_header(username, password)}params = {"database_name": database_name,"include_schema": True}# Make API request to get database schemaresponse = requests.get(url, headers=headers, params=params)if response.status_code != 200:return {"error": f"Status code: {response.status_code}"}return response.json()except Exception as e:return {"error": str(e)}async def answer_question(self, question: str, username: str, password: str) -> Dict[str, Any]:"""Send a question and get SQL + results"""try:url = f"{self.api_host}/answerDataQuestion"headers = {"Content-Type": "application/json","Authorization": self._get_basic_auth_header(username, password)}payload = {"question": question,"format": "json"}# Make API request to execute queryresponse = requests.post(url, headers=headers, json=payload)if response.status_code != 200:return {"error": f"Status code: {response.status_code}"}return response.json()except Exception as e:return {"error": str(e)}

客户端实现两种主要方法:

  • get_metadata():检索架构信息(表、视图、列、关系、数据类型)以准确生成 SQL。
  • answer_question()):回答应用了记忆的问题,生成 SQL,并针对数据库执行它。

身份验证使用具有每用户凭据的基本身份验证,保持用户隔离并确保用户仅访问授权数据,同时受益于个性化内存。

模块化设计允许在不更改核心内存架构的情况下交换数据平台,利用现有基础设施,同时添加自然语言功能。

内存代理实现

Mem0Agent 类是我们系统的认知核心,负责协调跨用户会话的记忆提取、检索和管理。

class Mem0Agent:"""Memory agent inspired by Mem0 architecture for text2SQL"""def __init__(self, use_postgres: bool = False, postgres_conn_string: str = None):# Initialize the appropriate memory storeif use_postgres and postgres_conn_string:try:import psycopg2self.store = PostgresMemoryStore(postgres_conn_string)print("Using PostgreSQL for memory storage")except ImportError:print("PostgreSQL support not available. Using JSON storage.")self.store = JSONMemoryStore()else:self.store = JSONMemoryStore()self.db_schema = Noneself.current_user_id = Noneself.max_recent_messages = 10def load_user_context(self, user_id: str):"""Load context for a specific user"""self.current_user_id = user_idself.memories = self.store.load_memories(user_id)self.conversation_summary = self.store.get_conversation_summary(user_id)self.recent_messages = self.store.get_recent_messages(user_id)

此实现使用策略模式来抽象存储机制,从而在 PostgreSQL(生产)和 JSON(开发)之间实现灵活性。这种分离使代理能够专注于独立于存储细节的内存作。

代理保持 Critical 状态:

  • db_schema:用于 SQL 生成的数据库元数据
  • current_user_id:用于内存隔离的活动用户标识符
  • max_recent_messages:可配置的即时上下文窗口

load_user_context() 方法实现了上下文切换,按需加载用户特定的记忆和对话。这实现了多用户支持,而不会在用户之间发生内存泄漏 - 这是一项关键的隐私要求。

内存提取和更新进程

我们系统的核心是它如何提取和更新内存:

async def extract_memories(self, message_pair: List[str]) -> List[str]:"""Extract salient memories from a message pair using LLM"""prompt = f"""You are an intelligent memory extraction system. Your task is to identify important information from conversations about database queries that should be remembered for future reference.Context:
Conversation summary so far: {self.conversation_summary}
Recent messages: {self.recent_messages}
Current message pair:
Human: {message_pair[0]}
AI: {message_pair[1]}
Extract 0-3 concise, salient facts that would be useful to remember in future database queries.
Pay special attention to the following categories and tag them accordingly:
1. [PREFERENCE] User's query preferences or filters (e.g., "User prefers to see loans with interest rates below 5%")
2. [TERM] Custom terminology or abbreviations defined by the user (e.g., "WC means West Coast states: CA, OR, and WA")
3. [METRIC] Custom metrics or calculations defined by the user (e.g., "High-risk loans are defined as those with credit score below 650 and loan amount over $200,000")
Format each memory as a single, concise sentence with the appropriate tag prefix. If no important information is worth retaining, return an empty list.
Extracted memories:"""# Send prompt to LLM and parse response...return memories

内存提取过程使用零样本学习,其中 LLM 充当语义过滤器来识别值得记住的信息,从而解决了确定哪些值得存储哪些可以遗忘的挑战。

提示工程使用针对 text2SQL 交互优化的结构化分类法(首选项、术语、指标),从而实现:

  1. 特定于域的检索:在 SQL 生成过程中,每种内存类型的处理方式不同
  2. 目标应用程序:首选项适用于筛选,术语适用于令牌替换
  3. 高效压缩:结构化记忆比原始对话需要更少的令牌空间

异步实现确保内存作不会阻止聊天流,从而允许与其他进程同时进行提取

与 Data Platform 集成

Mem0Text2SQLChatbot 类充当协调四个组件的门面

  1. 用于用户特定上下文的内存代理
  2. 用于数据库交互的 API 客户端
  3. 用于对话学习的内存提取管道
  4. 用于 SQL 生成的 Schema 感知系统

set_user() 方法实现会话初始化

  1. 存储身份验证凭证
  2. 加载用户的内存上下文
  3. 重置数据库元数据状态

身份验证和内存上下文之间的这种分离可以处理凭据和个性化配置文件独立发展的场景 — 这是常见的企业要求。

class Mem0Text2SQLChatbot:"""Chatbot that integrates memory with text2SQL capabilities"""def __init__(self,use_postgres: bool = False,postgres_conn_string: str = None,database_name: str = "bank"):self.agent = Mem0Agent(use_postgres, postgres_conn_string)self.denodo_client = DenodoAPIClient(DENODO_API_HOST)self.database_name = database_nameself.db_loaded = Falseself.current_user = Nonedef set_user(self, username: str, password: str):"""Set the current user for the chatbot"""self.current_user = {"username": username,"password": password}# Load user context from memory storeself.agent.load_user_context(username)# Reset database loaded flag for the new userself.db_loaded = Falseasync def initialize_db(self):"""Load database metadata at startup"""if not self.current_user:return Falseif not self.db_loaded:print(f"Loading database schema for '{self.database_name}' for user {self.current_user['username']}...")metadata = await self.denodo_client.get_metadata(self.database_name,self.current_user["username"],self.current_user["password"])if "error" in metadata:print(f"Error loading database schema: {metadata['error']}")return Falseself.agent.set_db_schema(metadata)self.db_loaded = Truereturn Truereturn True

消息处理功能实现了一个智能管道,通过八个关键阶段将自然语言转换为个性化 SQL——从身份验证验证到架构初始化、内存检索和分类,再到查询增强、SQL 生成,最后是内存提取和持久化。

async def process_message(self, user_message: str) -> str:"""Process a user message and generate a SQL response"""# Ensure we have a current userif not self.current_user:return "Please log in first."# Initialize database if not already doneif not self.db_loaded:db_initialized = await self.initialize_db()if not db_initialized:return "I'm having trouble connecting to the database. Please try again later."# Add user message to historyself.agent.add_message_to_history("Human", user_message)# Retrieve relevant memoriesrelevant_memories = await self.agent.retrieve_relevant_memories(user_message)# Process and categorize memoriespreferences = []terminology = []metrics = []general_context = []for memory in relevant_memories:if memory.startswith("[PREFERENCE]"):preferences.append(memory.replace("[PREFERENCE]", "").strip())elif memory.startswith("[TERM]"):terminology.append(memory.replace("[TERM]", "").strip())elif memory.startswith("[METRIC]"):metrics.append(memory.replace("[METRIC]", "").strip())else:general_context.append(memory)# Apply terminology substitutions and enhance the questionenhanced_question = user_messagefor term in terminology:if "means" in term:# Apply terminology substitutionsabbr, meaning = term.split("means", 1)abbr = abbr.strip()meaning = meaning.strip()if abbr in enhanced_question:enhanced_question = enhanced_question.replace(abbr, f"{abbr} ({meaning})")# Get answer from data platform with the enhanced questionresponse_data = await self.denodo_client.answer_question(enhanced_question,self.current_user["username"],self.current_user["password"])# Format and return the response# Extract memories from the conversationmemories = await self.agent.extract_memories([user_message, response])# Update memoriesawait self.agent.update_memories(memories)return response

此设计遵循事件溯源模式,交互生成影响未来响应的记忆。该系统不断学习,完善对用户需求的理解。

内存分类通过适当地应用每种类型来采用内存专用化

  • 查询中的术语记忆 → Token 替换
  • SQL 中的首选项内存→筛选条件
  • 度量存储器 → 计算和聚合方法
  • 一般背景→ 随访背景

此专用应用程序可确保 SQL 比通用方法更有效地整合用户特定的知识,从而显著提高查询相关性。

演示 Text2SQL 中长期记忆的强大功能

让我们看看我们的实现如何帮助两个不同的用户 Sandra 和 Alex 使用 Denodo 中的 bank 数据库与查询进行交互。

用户 1:Sandra(金融分析师)

Sandra 的第一次会议:

None

图片由作者提供

会话流程
  1. 身份验证:Sandra 登录,建立她的身份。
  2. 数据库连接:系统加载银行数据库架构以了解表和关系。
  3. 初始查询:“我们目前系统中有多少贷款?
  4. 首选项设置:“我只对未来批准的贷款感兴趣。
  5. 后续查询:“Show me the loans with the highest rate (显示利率最高的贷款)
  6. 内存应用程序:系统会自动过滤以仅显示已批准的高利率贷款。
内存系统的实际应用

右侧面板显示内存系统的工作情况:

  • 首选项内存:存储“用户仅对已批准的贷款感兴趣”并自动应用此过滤器。
  • Entity Memories:跟踪 Sandra 感兴趣的表。
  • Metric Memory:记录如何计算贷款以确保一致性。
  • 数据库上下文:维护架构知识以准确生成 SQL。

如果没有记忆,Sandra 需要反复指定 “approved loans”。系统会记住各个会话的首选项,因此 Sandra 可以在几天后返回以发现她的设置完好无损,从而创建个性化体验,而无需重新培训。

Sandra 的第 2 节(3 天后)

None

图片由作者提供

当 Sandra 问道:“让我们将高风险贷款定义为信用评分低于 750 的贷款。加州有多少高风险贷款?“,系统:

  1. 更新她的指标定义(阈值从 650 到 750)
  2. 自动应用她批准的贷款首选项
  3. 生成包含这两个首选项的 SQL:
WHERE "u"."credit_score" < 750
AND "l"."status" = 'approved'
AND "p"."state" = 'CA';
  1. 以“(注意:我已应用您之前表达的偏好)”进行确认

这演示了内存如何消除重复的交互。用户在以前的会话基础上进行构建,而无需重申首选项 - 类似于与记住会议要求的人工分析师合作。

用户 2:Alex(物业经理)

Alex 的会议:

None

图片由作者提供

此演示展示了具有长期记忆的 text2SQL 代理如何为物业经理 Alex 创建个性化体验:

  1. 区域首选项:Alex 指定他使用 California 属性,并存储为用户首选项。
  2. 自定义术语:Alex 将“豪华房产”定义为价值超过 300,000 美元的房产。
  3. 智能确认:当被问及豪华房源时,系统会确认是否应用他的加利福尼亚偏好。
  4. 个性化结果:确认后,系统会在 SQL 查询中应用两个首选项(仅限加利福尼亚 + 奢侈品 = >$300K)。

系统确认“我已应用您之前表达的偏好”——在消除重复指令的同时创造透明度。

生产性能优化

对于生产部署,请考虑以下优化:

  1. Embedding cache:存储常用的嵌入以减少 API 调用
  2. 数据库索引:确保内存表上的向量和文本索引正确
  3. 批处理:批量更新记忆,而不是单独更新
  4. 连接池:维护数据库连接池以提高吞吐量
  5. 异步执行:尽可能在后台处理内存作

text2SQL 代理的主要优势

向 text2SQL 代理添加长期内存具有以下几个优点:

  1. 个性化体验:用户无需在每次会话中重复偏好
  2. 术语一致性:自定义定义和缩写在会话中保持不变
  3. 减少摩擦:用户可以在以前的查询基础上进行构建,而无需重新建立上下文
  4. 多用户支持:每个用户都维护自己的内存上下文
  5. 更自然的交互:代理像人类一样记住重要细节
结论

长期记忆将 text2SQL 代理从基本的查询翻译器转变为随着时间的推移学习用户偏好的协作伙伴。通过实现这个受 Mem0 架构启发的内存系统,我创建了一个 text2SQL 代理,它可以记住重要的事情。

该系统从对话中提取、存储和检索重要事实,使座席能够在会话中维护上下文、记住用户偏好、学习自定义术语并提供越来越个性化的体验。这极大地改善了用户体验,同时使数据库交互感觉更自然。

虽然我使用 Denodo 作为本演示的数据平台,但相同的架构同样适用于 Snowflake、Databricks 或传统数据库。关键创新在于内存系统本身,无论底层数据平台如何,它都保持一致。

通过遵循此实施指南,您可以构建自己的具有长期记忆功能的 text2SQL 代理,从而为您的用户创建更加直观和个性化的数据交互体验。

🔗 代码与连接

此项目的完整源代码可在以下网址获得: https://github.com/xrzlizheng/Text2SQl-Agent-with-Long-Term-Memory

资源

  • https://arxiv.org/pdf/2504.19413
  • Sample Bank DataSet 基本结构来自 https://github.com/panaitescu-paul/Bank-Database-SQL-2020
http://www.lryc.cn/news/587872.html

相关文章:

  • R 语言科研绘图第 64 期 --- 哑铃图
  • 当前(2024-07-14)视频插帧(VFI)方向的 SOTA 基本被三篇顶会工作占据,按“精度-速度-感知质量”三条线总结如下,供你快速定位最新范式
  • 设计模式》》门面模式 适配器模式 区别
  • js与vue基础学习
  • Linux 基础命令详解:从入门到实践(1)
  • 基于Hadoop的竞赛网站日志数据分析与可视化(上)
  • STM32介绍和GPIO
  • Spring Boot启动原理:从main方法到内嵌Tomcat的全过程
  • Datawhale AI夏令营-基于带货视频评论的用户洞察挑战赛
  • [Python] -实用技巧4-Python中浅拷贝与深拷贝的区别详解
  • 工业软件加密锁复制:一场技术与安全的博弈
  • 借助DeepSeek编写输出漂亮表格的chdb客户端
  • 终端安全最佳实践
  • IIS错误:Service Unavailable HTTP Error 503. The service is unavailable.
  • SpringAi笔记
  • OpenCV 视频处理与摄像头操作详解
  • MySQL Innodb Cluster配置
  • 【CV综合实战】基于深度学习的工业压力表智能检测与读数系统【3】使用OpenCV读取分割后的压力表读数
  • DiffDet4SAR——首次将扩散模型用于SAR图像目标检测,来自2024 GRSL(ESI高被引1%论文)
  • vue-v-model进阶-ref-nextTick
  • 网络安全核心定律
  • 5G 到 6G通信技术的革新在哪里?
  • libimagequant windows 编译
  • 基于Python的就业数据获取与分析预测系统的设计与实现
  • Boost.Asio 异步写:为什么多次 async_write_some 会导致乱序,以及如何解决
  • 机器学习中的朴素贝叶斯(Naive Bayes)模型
  • 微软发布BioEmu模型
  • Web3:Foundry使用指南
  • 银河麒麟KYSEC安全机制详解
  • 《C++初阶之STL》【泛型编程 + STL简介】