当前位置: 首页 > news >正文

LLM——浅谈 LangGraph 中断式工作流:构建一个可交互的问答流程

在智能体(Agent)开发中,很多场景下我们希望智能体可以在 执行到一半时暂停,等待用户输入或外部事件,再继续运行。这种 中断-恢复 的工作流模式,在 LangGraph 中被优雅地实现为 interrupt-resume 流程。

本文将以一个具体示例为核心,详细拆解 LangGraph 中断式工作流的实现原理与代码逻辑,帮助读者深入理解如何用 LangGraph 编排一个具备“中断等待用户输入,再恢复继续执行”能力的智能体工作流。

一、背景与需求场景

想象一个简单的对话机器人,依次向用户提问:

  1. 你叫什么名字?
  2. 你多大年龄了?
  3. 你从事什么职业?

每问一个问题时,机器人 暂停,等待用户输入,然后继续执行下一步,直到全部问题问完。

这个场景天然符合“中断式工作流”的需求:

  • 智能体需要暂停(interrupt)让用户输入信息。
  • 用户输入后,智能体应恢复(resume)并继续后续流程。
  • 整个过程中保持上下文状态(State)不丢失。

LangGraph 提供的 interrupt-resume 流程编排能力,可以完美实现这一场景。

二、程序架构概览

本文示例程序的整体架构分为四部分:

  1. State 定义与节点函数编写 —— 定义对话所需的状态结构和节点逻辑。
  2. 构建工作流 StateGraph —— 编排节点的执行顺序。
  3. 配置 Checkpointer 与主循环执行逻辑 —— 支持中断与恢复。
  4. 运行工作流与处理中断事件 —— 交互式输入与状态保存。

三、代码解析

1. 定义全局 State 与节点函数

class State(TypedDict):name: Optional[str]age: Optional[str]job: Optional[str]message: Optional[str]

这里定义了工作流所需的全局状态 State,每个字段对应一次用户输入。

每个“问答节点”都使用 interrupt() 发起中断,等待用户输入:

def ask_name(state: State):answer = interrupt("What is your name?")return {"name": answer}def ask_age(state: State):answer = interrupt("What is your age?")return {"age": answer}def ask_job(state: State):answer = interrupt("What is your job?")return {"job": answer}

2. 构建 StateGraph 节点与边

builder = StateGraph(State)builder.add_node("ask_name", ask_name)
builder.add_node("ask_age", ask_age)
builder.add_node("ask_job", ask_job)
builder.add_node("join", join)  # 此处join只是为了测试,哈哈,可以不添加这个节点,直接ask_job-> ENDbuilder.add_edge(START, "ask_name")
builder.add_edge("ask_name", "ask_age")
builder.add_edge("ask_age", "ask_job")
builder.add_edge("ask_job", "join")
builder.add_edge("join", END)

这段代码通过 StateGraph 明确了节点的执行顺序:
START → ask_name → ask_age → ask_job → join → END

每个节点执行完后,将状态结果传递给下一个节点。

3. 启用内存 Checkpointer 与编译图

checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)

InMemorySaver 是一个简单的内存存储 Checkpointer(也可以换成 Redis、文件系统等持久化方案),用于在中断-恢复场景中保存工作流状态。

编译后的 graph 对象,具备了 LangGraph 的 流式执行与中断能力

四、核心执行流程:中断与恢复机制

state_data = {}  # 保存全局状态
resume_command = {}  # 用于保存恢复时的命令(Command)while True:interrupted = Falsestream_input = resume_command if isinstance(resume_command, Command) else state_data# graph.stream(既可以接收Command类型参数用于恢复中断后继续向下处理,又可以接收state_data这种字典类型的数据)for chunk in graph.stream(stream_input, config):# print(chunk) # 如果出现中断,输出为 {'__interrupt__': (Interrupt(value='What is your name?', resumable=True, ns=['ask_name:51e504d7-4cef-6cdf-90b4-5c3ec0d972d3']),)}if "__interrupt__" in chunk:# 遇到中断事件interrupt_event = chunk["__interrupt__"][0] # print(interrupt_event) # Interrupt(value='What is your age?', resumable=True, ns=['ask_age:cc0f05e3-e596-e6d6-f8f2-466ff809c63b'])interrupt_msg = interrupt_event.valueuser_input = input(f"{interrupt_msg} ")resume_command = Command(resume=user_input)  # 将用户的输入作为参数传到中断处interrupted = Truebreak  # 发生中断后终止本轮循环else:# print(chunk)  # 例如:{'ask_name': {'name': 'Mario'}}# 正常节点返回状态数据,合并到 state_datanew_dict = list(chunk.values())[0]if isinstance(new_dict, dict):state_data.update(new_dict)resume_command = {}if not interrupted:break  # 没有中断,流程结束# 打印最终结果
print("\n=== Conversation Completed ===")
print(state_data)
print(f"Name: {state_data.get('name')}")
print(f"Age: {state_data.get('age')}")
print(f"Job: {state_data.get('job')}")

关键点讲解

  1. 每轮循环执行 graph.stream()

    • 如果是首次执行:用空的 state_data
    • 如果是恢复执行:用 resume_command,携带用户输入数据恢复流。
  2. chunk 处理逻辑

    • 遇到 __interrupt__:工作流暂停,等待用户输入,将输入封装为 Command(resume=user_input),下一轮继续。
    • 遇到正常返回:将返回的字典合并到全局 state_data 中。
  3. resume_command 的核心作用

    • 当中断发生时,LangGraph 会生成一个 Command(resume=...) 结构来恢复。
    • 用户输入后,手动将输入封装为 Command(resume=用户输入) 传入下一轮流式调用。
    • LangGraph 会自动将 resume 的数据送回上次中断的节点继续执行。

】:中断与恢复机制看上去跟yeild和send()很像,读者朋友想要了解yeild和send()的话,可移步至 浅谈 Python 中的 yield——yield的返回值与send()的关系 这篇博客。

五、程序运行效果示例

What is your name? Mario
> Received name: Mario
What is your age? 25
> Received age: 25
What is your job? Developer
> Received job: Developer
{'name': 'Mario', 'age': '25', 'job': 'Developer'}=== Conversation Completed ===
{'name': 'Mario', 'age': '25', 'job': 'Developer', 'message': 'final'}
Name: Mario
Age: 25
Job: Developer

每次执行到 interrupt 时,程序暂停并等待用户输入,输入完成后自动恢复并执行下一个节点,直到工作流完成。

六、总结:LangGraph 中断式工作流的价值

通过本示例可以看到,LangGraph 提供了一种优雅且强大的中断-恢复编排机制,具有以下优势:

  1. 节点执行流支持随时中断与恢复 —— 适配用户输入、人工审核、外部事件等场景。
  2. 状态全程可控且一致性保障 —— 通过 Checkpointer 持久化状态,保障多轮交互一致性。
  3. 开发者掌控中断恢复的“主动权” —— 灵活定制如何暂停、如何恢复、恢复后如何继续。

这不仅适用于简单的问答流程,放大到更复杂的 人工协同 AI 任务、审批流、复杂诊断流程 中,同样适用。

七、读者可进一步拓展

  1. 状态持久化到 Redis 或数据库,支持跨设备恢复。
  2. 结合 LangChain 工具调用(Tool Calling),实现“用户输入 + 工具执行”混合流。
  3. 与 Web UI (如 Streamlit, Gradio) 集成,实现浏览器端交互。

附:完整代码

import uuid
from typing import Optional
from typing_extensions import TypedDictfrom langgraph.checkpoint.memory import InMemorySaver
from langgraph.constants import START, END
from langgraph.graph import StateGraph
from langgraph.types import interrupt, Command# 定义全局 State
class State(TypedDict):name: Optional[str]age: Optional[str]job: Optional[str]message: Optional[str]# 节点1:问名字
def ask_name(state: State):answer = interrupt("What is your name?")print(f"> Received name: {answer}")return {"name": answer}# 节点2:问年龄
def ask_age(state: State):answer = interrupt("What is your age?")print(f"> Received age: {answer}")return {"age": answer}# 节点3:问职业
def ask_job(state: State):answer = interrupt("What is your job?")print(f"> Received job: {answer}")return {"job": answer}def join(state: State):print(state)return {"message": "final"}# 构建 StateGraph
builder = StateGraph(State)builder.add_node("ask_name", ask_name)
builder.add_node("ask_age", ask_age)
builder.add_node("ask_job", ask_job)
builder.add_node("join", join)builder.add_edge(START, "ask_name")
builder.add_edge("ask_name", "ask_age")
builder.add_edge("ask_age", "ask_job")
builder.add_edge("ask_job", "join")
builder.add_edge("join", END)# 启用内存 Checkpointer
checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)config = {"configurable": {"thread_id": str(uuid.uuid4())}
}# 主循环 — 处理中断与恢复
state_data = {}  # 保存全局状态
resume_command = {}  # 用于保存commandwhile True:interrupted = False# 每轮根据是否是恢复执行来调用 stream()stream_input = resume_command if isinstance(resume_command, Command) else state_data# graph.stream(既可以接收Command类型参数用于恢复中断后继续向下处理,又可以接收state_data这种字典类型的数据)for chunk in graph.stream(stream_input, config):# print(chunk)if "__interrupt__" in chunk:# 遇到中断事件interrupt_event = chunk["__interrupt__"][0]# print(interrupt_event)interrupt_msg = interrupt_event.value# 提示用户输入user_input = input(f"{interrupt_msg} ")# 使用用户输入恢复工作流# 只把恢复命令放到 resume_command,不动 state_dataresume_command = Command(resume=user_input)interrupted = Truebreak  # 本轮中断恢复后再继续下一轮else:# 继续更新状态new_dict = list(chunk.values())[0]if isinstance(new_dict, dict):state_data.update(new_dict)resume_command = {}if not interrupted:break  # 没有中断了,流程结束# 打印最终结果
print("\n=== Conversation Completed ===")
print(state_data)
print(f"Name: {state_data.get('name')}")
print(f"Age: {state_data.get('age')}")
print(f"Job: {state_data.get('job')}")
http://www.lryc.cn/news/611758.html

相关文章:

  • Effective C++ 条款26: 尽可能延后变量定义式的出现时间
  • RN项目环境搭建和使用-Mac版本(模拟器启动不起来的排查)
  • Solidity 编程进阶
  • 阿里国际招AI产品经理咯
  • 用 “私房钱” 类比闭包:为啥它能访问外部变量?
  • Google Chrome <139.0.7236.0 UAF漏洞
  • RabbitMQ面试精讲 Day 12:镜像队列与Quorum队列对比
  • MATLAB下载教程MATLAB R2025a 保姆级安装步骤(附安装包)
  • 双馈和永磁风机构网型跟网型联合一次调频并入同步机电网,参与系统一次调频,虚拟惯量下垂,虚拟同步机VSG控制matlab/simulink
  • matlab——simulink学习(5向NXP库中添加新模块)
  • 计算机网络:如何判断B或者C类IP地址是否划分了子网
  • Linux之Shell脚本基本语法
  • 3步学会使用渲染101--3DMAX云渲染
  • 【计算机网络 | 第3篇】物理媒介
  • 【数据结构与算法-Day 12】深入浅出栈:从“后进先出”原理到数组与链表双实现
  • 探索Linux MMC子系统的奥秘
  • TypeScript 元组类型精简知识点
  • 大数据存储域——HDFS存储系统
  • MCP协议与Spring AI框架实战
  • NY112NY117美光固态闪存NY119NY123
  • 新手向:Python实现简易计算器
  • 疯狂星期四文案网第30天运营日记
  • mysql索引的用法
  • Suno API V5模型 python源码 —— 使用灵感模式进行出创作
  • 国产3D大型装配设计新突破①:图纸打开设计双加速 | 中望3D 2026
  • rsync 的三种常见用法
  • 学习bug
  • jmm 指令重排 缓存可见性 Volatile 内存屏障
  • word2vector细致分解(CBOW, SKIP_GRAM, 层次soft Max, 负采样)
  • linux创建虚拟内存