当前位置: 首页 > news >正文

LangGraph认知篇-Command函数

Command简述

        在 LangGraph 中,Command 是一个极具实用性的功能,它能够将控制流(边)和状态更新(节点)巧妙地结合起来。这意味着开发者可以在同一个节点中,既执行状态更新操作,又决定下一个要前往的节点,为工作流的构建带来了极大的灵活性。

Command基本用法

Command 允许你在单个节点函数中完成两件关键任务:

  1. 更新图状态 (update): 修改共享的 State 对象。

  2. 指定下一节点 (goto): 显式决定工作流下一步执行哪个节点。

def my_node(state: State) -> Command[Literal["my_other_node"]]:​return Command(​# 状态更新​update={"foo": "bar"},​# 控制流​goto="my_other_node"​)

        在这个示例中,my_node 函数返回一个 Command 对象,

  • update 参数用于指定状态的更新内容,将状态中的 "foo" 键值设为 "bar";

  • goto 参数则决定了下一个要前往的节点是 "my_other_node"。

关键约束

        在节点函数中返回 Command 时,必须添加返回类型注释,注明该节点可路由到的节点名称列表,如 Command[Literal["my_other_node"]]。这一点至关重要,它不仅是 graph 渲染所必需的,还能明确告知 LangGraph 该节点可以导航到 "my_other_node"。

Command 核心应用场景

动态控制流(替代条件边)

        直接在节点逻辑中根据状态判断跳转路径,代码更内聚:

def check_threshold(state: State) -> Command[Literal["proceed", "halt"]]:if state["value"] > state["threshold"]:return Command(goto="halt")  # 无需更新状态时update可省略return Command(goto="proceed")

Command 与条件边的决策

  • Command当您需要同时更新图形状态和路由到其他节点时使用。例如,在实现多代理切换时,需要路由到其他代理并向该代理传递一些信息。

  • 仅需根据当前状态选择分支且不涉及状态修改(如循环判断、简单路由),使用条件边更清晰。

跨层级节点跳转(子图 → 父图)

        实现跨子图的工作流跳转,需指定 graph=Command.PARENT,代码示例如下:

from typing import Literal, Optional, Annotated
from operator import addfrom langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command
from pydantic import BaseModel# 定义子图状态
class SubgraphState(BaseModel):"""子图状态定义"""issue: strescalated: bool = False  # 与父图共享的状态字段# 定义父图状态 reducer(处理子图与父图的状态冲突)
def escalated_reducer(left: bool, right: bool) -> bool:"""共享字段escalated的合并策略:右侧值(子图更新)优先"""return right# 定义父图状态
class ParentGraphState(BaseModel):"""父图状态定义"""issue: strescalated: Annotated[bool, escalated_reducer] = False  # 使用reducer注解resolution: Optional[str] = None# 子图节点
def subgraph_worker(state: SubgraphState) -> Command[Literal["manager_approval", END]]:"""子图中的工作节点,决定是否升级到父图"""if "critical" in state.issue.lower():return Command(update={"escalated": True},  # 更新共享状态goto="manager_approval",  # 父图中的目标节点graph=Command.PARENT  # 关键:指定跳转到父图)return Command(goto=END)  # 不升级则直接结束子图# 构建子图
subgraph_builder = StateGraph(SubgraphState)
subgraph_builder.add_node("worker", subgraph_worker)
subgraph_builder.add_edge(START, "worker")
subgraph = subgraph_builder.compile()# 父图节点
def manager_approval_node(state: ParentGraphState) -> ParentGraphState:"""父图中的经理审批节点"""return ParentGraphState(**state.model_dump(), resolution="经理已审批处理")# 构建父图(包含子图)
parent_builder = StateGraph(ParentGraphState)
parent_builder.add_node("subgraph", subgraph)  # 嵌入子图
parent_builder.add_node("manager_approval", manager_approval_node)# 定义父图边
parent_builder.add_edge(START, "subgraph")
parent_builder.add_edge("manager_approval", END)# 编译父图
parent_graph = parent_builder.compile(checkpointer=MemorySaver()
)# 运行示例
if __name__ == "__main__":print("=== 跨层级跳转测试 ===")# 测试会触发升级的情况result1 = parent_graph.invoke({"issue": "Critical error: system down"})print(f"测试1 - 触发升级: {result1}")# 应输出包含escalated=True和经理审批结果的状态# 测试不会触发升级的情况result2 = parent_graph.invoke({"issue": "Minor issue: slow response"})print(f"测试2 - 不触发升级: {result2}")# 应输出escalated=False且无经理审批结果的状态
  • 状态同步注意:若父子图共享状态字段,需在父图状态中为该字段定义 reducer 函数处理冲突。

工具调用与状态注入

        一个常见的工具调用场景是从工具内部更新图谱状态。例如,在客户支持应用中,您可能希望在对话开始时根据客户的账号或 ID 查找客户信息。要从工具中更新图谱状态,您可以从工具中返回 Command(update={"my_custom_key": "foo", "messages": [...]})。代码示例如下:

from typing import Annotated, Dict, Any, List
from pydantic import BaseModel
from operator import addfrom langgraph.graph import StateGraph, START, END, Command
from langgraph.prebuilt import ToolNode
from langgraph.checkpoint.memory import MemorySaver
from langchain_core.tools import tool
from langchain_core.messages import AIMessage, ToolMessage# 定义消息列表的reducer
def add_messages(left: List[Any], right: List[Any]) -> List[Any]:"""消息列表的合并策略"""return left + right# 1. 定义状态模型
class SupportState(BaseModel):user_id: str  # 客户IDuser_info: Annotated[Dict[str, Any], add] = {}  # 工具返回的用户信息messages: Annotated[List[Any], add_messages] = []  # 消息历史(必须包含工具调用记录)query_status: str = "pending"  # 流程状态标记# 2. 定义工具函数
@tool
def lookup_user_info(tool_call_id: Annotated[str, "tool_call_id"]) -> Command:"""根据用户ID查询客户信息(内部工具)"""# 假设从线程配置中获取user_id(实际中可从config获取)# 注意:此处简化处理,实际需注入configuser_id = "CUST-789"  # 模拟获取print(f"正在查询用户信息: {user_id}")# 模拟实际查询逻辑(可替换为数据库/API调用)user_data = {"user_id": user_id,"name": "陈明亮","membership": "白金会员","contact": "chen@example.com","recent_tickets": "订单延迟问题"}print(f"查询结果: {user_data}")# 返回Command更新状态return Command(update={"user_info": user_data,"messages": [ToolMessage(content=f"已获取用户 {user_id} 的详细信息",tool_call_id=tool_call_id)],"query_status": "user_info_fetched"})# 3. 定义触发工具调用的节点
def call_tool(state: SupportState) -> SupportState:"""创建工具调用消息的节点"""print(f"\n===== 触发工具调用 =====")tool_call_message = AIMessage(content="",tool_calls=[{"name": "lookup_user_info","args": {},  # 无额外参数"id": "call_123"}])return SupportState(messages=state.messages + [tool_call_message])# 4. 定义后续处理节点
def handle_support_request(state: SupportState) -> SupportState:"""处理客户请求的节点(依赖工具返回的用户信息)"""print(f"\n===== 开始处理客户请求 =====")print(f"客户ID: {state.user_id}")print(f"客户信息: {state.user_info}")print(f"当前状态: {state.query_status}")print(f"消息记录: {state.messages}")# 基于用户信息进行业务处理if state.user_info.get("membership") == "白金会员":priority_msg = "优先处理白金会员请求"else:priority_msg = "标准流程处理请求"# 更新最终状态return SupportState(user_id=state.user_id,user_info=state.user_info,messages=state.messages + [priority_msg, "客户请求处理完成"],query_status="completed")# 5. 构建图谱(使用ToolNode处理Command)
def build_support_graph():builder = StateGraph(SupportState)# 创建ToolNodetool_node = ToolNode([lookup_user_info])# 添加节点builder.add_node("call_tool", call_tool)builder.add_node("tools", tool_node)builder.add_node("process_request", handle_support_request)# 定义边builder.add_edge(START, "call_tool")builder.add_edge("call_tool", "tools")builder.add_edge("tools", "process_request")builder.add_edge("process_request", END)return builder.compile(checkpointer=MemorySaver())# 6. 运行示例
if __name__ == "__main__":support_graph = build_support_graph()print("=== 客户支持流程开始 ===")result = support_graph.invoke({"user_id": "CUST-789"})print("\n===== 流程结束 =====")print(f"最终用户信息: {result['user_info']['name']} ({result['user_info']['membership']})")print(f"流程完成状态: {result['query_status']}")print(f"完整消息历史: {result['messages']}")
  • @Tool 工具Command对象返回值:

    • update参数需包含业务数据(如用户信息、查询结果)和消息历史

    • messages字段必须包含ToolMessage,且需关联对应的tool_call_id
      (这是 LLM 提供商要求的格式,确保工具调用与结果在消息链中正确关联)

  • ToolNode 的核心作用:

    • 自动 Command 处理:解析工具返回的 Command 对象并更新状态

    • 消息历史维护:自动添加 ToolMessage 到消息序列

    • 错误处理:内置工具执行异常捕获机制

    • ID 管理:自动处理 tool_call_id 的生成和关联

人机协同工作流(Human-in-the-Loop)

        Command还可以与 interrupt() 结合实现人工审核中断与恢复:

from typing import TypedDict
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph
from langgraph.types import Command, interrupt# 新增模型相关导入
from langchain_core.messages import HumanMessage
from langchain_openai import ChatOpenAI
from dotenv import load_dotenv
load_dotenv()# 定义状态类型
class State(TypedDict):some_text: str# 初始化大模型(需要设置OPENAI_API_KEY环境变量)
model = ChatOpenAI(model="gpt-4o-mini")# 初始化检查点存储
checkpointer = MemorySaver()# 定义人类介入节点
def human_node(state: State):value = interrupt({"text_to_revise": state["some_text"],"instructions": "请修改以下文本:"})return {"some_text": value}# 修改后的自动处理节点(调用大模型)
def process_text(state: State):# 构造模型请求message = model.invoke([HumanMessage(content=f"请处理以下请求:{state['some_text']}。保持回答简洁。")])# 返回模型生成的文本return {"some_text": message.content}# 构建工作流
graph_builder = StateGraph(State)# 添加节点
graph_builder.add_node("human_review", human_node)
graph_builder.add_node("auto_process", process_text)# 设置流程
graph_builder.set_entry_point("auto_process")
graph_builder.add_edge("auto_process", "human_review")# 编译图表
graph = graph_builder.compile(checkpointer=checkpointer,interrupt_before=["human_review"]
)# 使用示例(保持不变)
if __name__ == "__main__":thread_id = "thread_123"thread_config = {"configurable": {"thread_id": thread_id}}initial_state = {"some_text": "输出一个五五乘法表"}result = graph.invoke(initial_state, config=thread_config)print("自动处理结果:", result["some_text"])human_input = input("请输入人类输入:")resume_result = graph.invoke(Command(resume=human_input),config=thread_config)print("最终结果:", resume_result["some_text"])

最佳实践与注意事项

  1. 类型标注不可少-> Command[Literal["node_a", "node_b"]] 是 LangGraph 静态检查和绘图的基础。

  2. 跨图状态设计:子图跳转父图更新共享状态时,父图需定义 reducer 处理冲突(如 lambda current, update: update)。

  3. 命名一致性goto 指定的节点名必须与图中注册的节点名完全一致。

  4. 工具安全调用:在工具中使用 Command 时,确保状态更新不会破坏图的一致性。

参考文献

Overview

http://www.lryc.cn/news/608173.html

相关文章:

  • UDP通信中BIND端口号的作用解析,LOCALPORT的关系解析
  • 搜索与图论(最小生成树 二分图)
  • 【Django】-5- ORM的其他用法
  • 企业级单点登录(SSO)技术详解:从原理到安全实践
  • 前端与后端部署大冒险:Java、Go、C++三剑客
  • ARM Cortex-M异常处理高级特性详解
  • 集成电路学习:什么是HAL硬件抽象层
  • 【DL学习笔记】计算图与自动求导
  • 紧急救援!Oracle RAC节点驱逐元凶:私网Packet Reassembles Failed“包重组失败”一招救命
  • linux ssh公钥移除办法
  • MySQL PostgreSQL JDBC URL 配置允许批量操作
  • SM2国密算法的大数运算原理详解
  • 牛客 - 旋转数组的最小数字
  • 【PCL点云库:下】
  • 详解Python标准库之互联网数据处理
  • 一个物理引擎仿真器(mujoco这种)的计算流程
  • 回溯 79 单词搜索一波三折想和
  • 中科院开源HYPIR图像复原大模型:1.7秒,老照片变8K画质
  • 深入剖析Nacos:云原生架构的基石
  • JVM 02 垃圾回收
  • 【LeetCode 热题 100】(三)滑动窗口
  • file命令libmagic、python的cchardet库使用、自定义magic文件的使用
  • 【Spring Boot 快速入门】五、文件上传
  • Python 入门指南:从零基础到环境搭建
  • Qt 信号和槽正常连接返回true,但发送信号后槽函数无响应问题【已解决】
  • AI原生数据库:告别SQL的新时代来了?
  • 飞书推送工具-自动化测试发送测试报告一种方式
  • Linux 动静态库的制作和使用
  • [硬件电路-121]:模拟电路 - 信号处理电路 - 模拟电路中常见的难题
  • FastAPI--一个快速的 Python Web