LangGraph--基础学习(Human-in-the-loop 人工参与深入学习2)
验证人工输入
如果您需要在图本身(而不是在客户端)内验证人类提供的输入,则可以通过在单个节点内使用多个中断调用来实现。
from typing import TypedDict
import uuidfrom langgraph.constants import START, END
from langgraph.graph import StateGraph
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import MemorySaverclass State(TypedDict):age: intdef get_valid_age(state: State) -> State:prompt = "请输入你的年龄 (必须是非负整数)."while True:user_input = interrupt(prompt)# Validate the inputtry:age = int(user_input)if age < 0:raise ValueError("年龄必须是非负整数.")break # Valid input receivedexcept (ValueError, TypeError):prompt = f"'{user_input}' 无效,请输入非负整数年龄."return {"age": age}# Node that uses the valid input
def report_age(state: State) -> State:print(f"✅ Human is {state['age']} years old.")return state# Build the graph
builder = StateGraph(State)
builder.add_node("get_valid_age", get_valid_age)
builder.add_node("report_age", report_age)builder.set_entry_point("get_valid_age")
builder.add_edge("get_valid_age", "report_age")
builder.add_edge("report_age", END)# Create the graph with a memory checkpointer
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)# Run the graph until the first interrupt
config = {"configurable": {"thread_id": uuid.uuid4()}}
result = graph.invoke({}, config=config)
print(result["__interrupt__"]) # First prompt: "Please enter your age..."print("----------------------------------")# Simulate an invalid input (e.g., string instead of integer)
result = graph.invoke(Command(resume="not a number"), config=config)
print(result["__interrupt__"]) # Follow-up prompt with validation messageprint("----------------------------------")
# Simulate a second invalid input (e.g., negative number)
result = graph.invoke(Command(resume="-10"), config=config)
print(result["__interrupt__"]) # Another retryprint("----------------------------------")
# Provide valid input
final_result = graph.invoke(Command(resume="25"), config=config)
print(final_result) # Should include the valid age
[Interrupt(value='请输入你的年龄 (必须是非负整数).', resumable=True, ns=['get_valid_age:1c6582a8-3763-450b-d886-ea2bd687e862'])]
----------------------------------
[Interrupt(value="'not a number' 无效,请输入非负整数年龄.", resumable=True, ns=['get_valid_age:1c6582a8-3763-450b-d886-ea2bd687e862'])]
----------------------------------
[Interrupt(value="'-10' 无效,请输入非负整数年龄.", resumable=True, ns=['get_valid_age:1c6582a8-3763-450b-d886-ea2bd687e862'])]
----------------------------------
✅ Human is 25 years old.
{'age': 25}
Resume 使用 Command
原语
当在图形中使用中断
函数时,执行在该点暂停并等待用户输入。
提供对中断
的响应: 要继续执行,请使用 Command(resume=value)
传递用户的输入。该图从中断(.)的节点开始恢复执行最初被称为。这一次, 中断
函数将返回 Command(resume=value)
中提供的值,而不是再次暂停。
如何从中断中恢复工作?
使用中断
的一个关键方面是理解恢复是如何工作的。当您在中断
后恢复执行时,图形执行将从触发最后一次中断
的图形节点的开头开始。
从节点开始到中断
的所有代码都将重新执行。
counter = 0
def node(state: State):# All the code from the beginning of the node to the interrupt will be re-executed# when the graph resumes.global countercounter += 1print(f"> Entered the node: {counter} # of times")# Pause the graph and wait for user input.answer = interrupt()print("The value of counter is:", counter)...
恢复图形后,计数器将第二次递增,导致以下输出:
> Entered the node: 2 # of times
The value of counter is: 2
通过一次调用恢复多个中断
如果任务队列中有多个中断,则可以使用 Command.resume,并将
中断 id 映射到 resume 值的字典中,以通过单个 invoke
/stream
调用来恢复多个中断。
例如,一旦你的图表被中断(理论上是多次)并停止:
resume_map = {i.interrupt_id: f"human input for prompt {i.value}"for i in parent.get_state(thread_config).interrupts
}parent_graph.invoke(Command(resume=resume_map), config=thread_config)
常见陷阱
因为中断的重新执行是通过中断节点进行重新执行的,所以如果存在api调用或者任务调用在中断之前调用,则会重复调用,如下:
from langgraph.types import interruptdef human_node(state: State):"""Human node with validation."""api_call(...) # This code will be re-executed when the node is resumed.answer = interrupt(question)
上面的api_call代码就会重复调用,比较好的处理方法是把业务执行代码放到中断后,或者单独放到其他节点,如:
from langgraph.types import interruptdef human_node(state: State):"""Human node with validation."""answer = interrupt(question)api_call(answer) # OK as it's after the interrupt
from langgraph.types import interruptdef human_node(state: State):"""Human node with validation."""answer = interrupt(question)return {"answer": answer}def api_call_node(state: State):api_call(...) # OK as it's in a separate node
调用的子图函数
当调用子图作为函数时, 父图将从调用子图的节点的开始处 (以及触发中断
的节点)恢复执行。类似地, 子图将从调用 interrupt()
函数的节点的开始处恢复。例如:
def node_in_parent_graph(state: State):some_code() # <-- This will re-execute when the subgraph is resumed.# Invoke a subgraph as a function.# The subgraph contains an `interrupt` call.subgraph_result = subgraph.invoke(some_input)
父图和子图执行流
假设我们有一个有3个节点的父图:
父图:node_1 → node_2(子图调用)→ node_3
子图有 3 个节点,其中第二个节点包含中断:
子图:sub_node_1 → sub_node_2(中断)→ sub_node_3
当恢复图形时,执行将按如下方式进行:
跳过父图形node_1 (已执行,图形状态已保存在快照中)。
在父图形node_2中从头开始重新执行,因为中断是在父图的node_2开始的。
跳过子图sub_node_1 (已执行,图状态已保存在快照中)。
在子图sub_node_2中从头开始重新执行。
继续处理 sub_node_3 和后续节点。
下面是一段简短的示例代码,您可以使用它来理解子图如何处理中断。 它计算每个节点被输入的次数并打印计数。
import uuid
from typing import TypedDictfrom langgraph.graph import StateGraph
from langgraph.constants import START
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import MemorySaverclass State(TypedDict):"""The graph state."""state_counter: intcounter_node_in_subgraph = 0def node_in_subgraph(state: State):"""A node in the sub-graph."""global counter_node_in_subgraphcounter_node_in_subgraph += 1 # This code will **NOT** run again!print(f"Entered `node_in_subgraph` a total of {counter_node_in_subgraph} times")counter_human_node = 0def human_node(state: State):global counter_human_nodecounter_human_node += 1 # This code will run again!print(f"Entered human_node in sub-graph a total of {counter_human_node} times")answer = interrupt("what is your name?")print(f"Got an answer of {answer}")checkpointer = MemorySaver()subgraph_builder = StateGraph(State)
subgraph_builder.add_node("some_node", node_in_subgraph)
subgraph_builder.add_node("human_node", human_node)
subgraph_builder.add_edge(START, "some_node")
subgraph_builder.add_edge("some_node", "human_node")
subgraph = subgraph_builder.compile(checkpointer=checkpointer)counter_parent_node = 0def parent_node(state: State):"""This parent node will invoke the subgraph."""global counter_parent_nodecounter_parent_node += 1 # This code will run again on resuming!print(f"Entered `parent_node` a total of {counter_parent_node} times")# Please note that we're intentionally incrementing the state counter# in the graph state as well to demonstrate that the subgraph update# of the same key will not conflict with the parent graph (untilsubgraph_state = subgraph.invoke(state)return subgraph_statebuilder = StateGraph(State)
builder.add_node("parent_node", parent_node)
builder.add_edge(START, "parent_node")# A checkpointer must be enabled for interrupts to work!
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)config = {"configurable": {"thread_id": uuid.uuid4(),}
}for chunk in graph.stream({"state_counter": 1}, config):print(chunk)print('--- Resuming ---')for chunk in graph.stream(Command(resume="35"), config):print(chunk)
Entered `parent_node` a total of 1 times
Entered `node_in_subgraph` a total of 1 times
Entered human_node in sub-graph a total of 1 times
{'__interrupt__': (Interrupt(value='what is your name?', resumable=True, ns=['parent_node:4c3a0248-21f0-1287-eacf-3002bc304db4', 'human_node:2fe86d52-6f70-2a3f-6b2f-b1eededd6348'], when='during'),)}
--- Resuming ---
Entered `parent_node` a total of 2 times
Entered human_node in sub-graph a total of 2 times
Got an answer of 35
{'parent_node': {'state_counter': 1}}
使用多个中断
在单个节点中使用多个中断对于验证人工输入等模式很有帮助。 但是,如果不小心处理,在同一节点中使用多个中断可能会导致意外行为。
当一个节点包含多个中断调用时,LangGraph 会保留一个特定于执行该节点的任务的恢复值列表。每当执行恢复时,它都从节点的开头开始。对于遇到的每个中断,LangGraph 检查任务的恢复列表中是否存在匹配的值。匹配是严格基于索引的 ,因此节点内中断调用的顺序至关重要。
为了避免出现问题,请避免在执行之间动态更改节点的结构。这包括添加、删除或重新排序中断调用,因为此类更改可能导致索引不匹配。这些问题通常来自非常规模式,例如通过 Command(resume=..., update=SOME_STATE_MUTATION)
改变状态或依赖全局变量动态修改节点的结构。
import uuid
from typing import TypedDict, Optionalfrom langgraph.graph import StateGraph
from langgraph.constants import START
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import MemorySaverclass State(TypedDict):"""The graph state."""age: Optional[str]name: Optional[str]def human_node(state: State):if not state.get('name'):name = interrupt("what is your name?")else:name = "N/A"if not state.get('age'):age = interrupt("what is your age?")else:age = "N/A"print(f"Name: {name}. Age: {age}")return {"age": age,"name": name,}builder = StateGraph(State)
builder.add_node("human_node", human_node)
builder.add_edge(START, "human_node")# A checkpointer must be enabled for interrupts to work!
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)config = {"configurable": {"thread_id": uuid.uuid4(),}
}for chunk in graph.stream({"age": None, "name": None}, config):print(chunk)for chunk in graph.stream(Command(resume="John", update={"name": "foo"}), config):print(chunk)
{'__interrupt__': (Interrupt(value='what is your name?', resumable=True, ns=['human_node:3a007ef9-c30d-c357-1ec1-86a1a70d8fba'], when='during'),)}
Name: N/A. Age: John
{'human_node': {'age': 'John', 'name': 'N/A'}}
上面可发现处理多个中断时遇到的问题是第二个中断没有被正确触发。这是因为当恢复第一个中断时,整个节点会重新执行,而状态已经被部分更新。所以需要一个一个的按照顺序处理中断
下面这里处理中断就可以了:
for chunk in graph.stream({"age": None, "name": None}, config):print(chunk)# for chunk in graph.stream(Command(resume=["John","30"], update={"name": "foo"}), config):
# print(chunk)
print("\nResume with name:")
for chunk in graph.stream(Command(resume="John"), config):print(chunk)# Resume with age
print("\nResume with age:")
for chunk in graph.stream(Command(resume="25"), config):print(chunk)
{'__interrupt__': (Interrupt(value='what is your name?', resumable=True, ns=['human_node:c2c5393e-577a-2c51-7f50-64735a1a92fd']),)}Resume with name:
{'__interrupt__': (Interrupt(value='what is your age?', resumable=True, ns=['human_node:c2c5393e-577a-2c51-7f50-64735a1a92fd']),)}Resume with age:
Name: John. Age: 25
{'human_node': {'age': '25', 'name': 'John'}}