Langgraph 学习教程
目录
LangGraph 的核心作用
Langgraph常见示例场景:
基础工作流构建:线性工作流 顺序执行
条件分支: 根据状态选择动态路由
循环控制,固定次数循环
并行处理: 多节点并行处理
消息传递: 聊天对话流
错误处理:捕获并处理异常
关键点总结
构建工作流来调用工具,来访问本地LLM
可视化工作流 调试建议
调用工具访问LLM 案例
多轮对话系统案例
LangGraph多代理协作中实现固定循环次数后自动停止
LangGraph 的核心作用
LangGraph 是 LangChain 生态系统中的一个库,专门用于构建有状态、多参与者的AI工作流。它的核心价值在于:
功能 | 说明 |
有状态工作流 | 支持长时间运行的对话或任务,保持中间状态(如多轮对话的上下文) |
多参与者协作 | 允许多个AI代理、工具或人类用户协同工作 |
循环与分支控制 | 实现复杂逻辑(如条件判断、循环迭代) |
可视化调试 | 提供工作流可视化工具,便于开发和故障排查 |
与普通LangChain的关键区别
特性 | LangChain Chain | LangGraph |
状态管理 | 无状态(单次执行) | 有状态(跨步骤持久化) |
复杂性 | 适合线性流程 | 适合带分支/循环的复杂流程 |
多参与者支持 | 需手动协调 | 原生支持多代理协作 |
典型应用 | 简单QA、检索增强 | 复杂决策、多轮交互 |
LangGraph 特别适合需要长期记忆或多角色协作的AI应用,是企业级复杂工作流编排的理想选择。对于简单任务,传统的LangChain Chain仍是更轻量级的方案。
Langgraph常见示例场景:
基础工作流构建:线性工作流 顺序执行
from langgraph.graph import StateGraph, END, START
from typing import TypedDict# 定义状态类型
class State(TypedDict):input: stroutput: str# 定义节点函数
def node1(state: State):return {"output": state["input"].upper()}def node2(state: State):return {"output": state["output"] + "!"}# 构建工作流
workflow = StateGraph(State)
workflow.add_node("uppercase", node1)
workflow.add_node("add_exclamation", node2)# 设置执行顺序
workflow.add_edge("uppercase", "add_exclamation")
workflow.add_edge("add_exclamation", END)workflow.add_edge(START, "uppercase")
# 编译并运行
app = workflow.compile()
result = app.invoke({"input": "hello"})
print(result["output"])# 运行结果: HELLO!
条件分支: 根据状态选择动态路由
from typing import Literal, TypedDict
from langgraph.graph import StateGraph, START, ENDclass BranchState(TypedDict):value: intpath: Literal["even", "odd"]def process_number(state: BranchState):return {"value": state["value"] * 2}def even_path(state: BranchState):return {"path": "even"}def odd_path(state: BranchState):return {"path": "odd"}workflow = StateGraph(BranchState)
workflow.add_node("process", process_number)
workflow.add_node("even", even_path)
workflow.add_node("odd", odd_path)# 条件分支函数
def decide_path(state: BranchState):return "even" if state["value"] % 2 == 0 else "odd"workflow.add_conditional_edges("process",decide_path,{"even": "even", "odd": "odd"} # 分支映射
)
workflow.add_edge("even", END)
workflow.add_edge("odd", END)
workflow.add_edge(START,'process')app = workflow.compile()
# 测试
print( app.invoke({"value": 3}))
print( app.invoke({"value": 3.1}))# 输出:
# {"value": 6, "path": "even"}
# {'value': 6.2, 'path': 'odd'}
循环控制,固定次数循环
from typing import Literal, TypedDict
from langgraph.graph import StateGraph, START, ENDclass LoopState(TypedDict):counter: intmax_count: intdef increment(state: LoopState):return {"counter": state["counter"] + 1}def should_continue(state: LoopState):return "continue" if state["counter"] < state["max_count"] else "end"workflow = StateGraph(LoopState)
workflow.add_node("increment", increment)# 循环条件
workflow.add_conditional_edges("increment",should_continue,{"continue": "increment", "end": END}
)
workflow.add_edge(START, 'increment')
app = workflow.compile()
result = app.invoke({"counter": 0, "max_count": 3})
print(result["counter"])
# 输出: 3
并行处理: 多节点并行处理
from typing import Literal, TypedDict, List
from langgraph.graph import StateGraph, START, ENDclass ParallelState(TypedDict):inputs: List[str]results: List[str]def task_a(state: ParallelState):return {"results": [s.upper() for s in state["inputs"]]}def task_b(state: ParallelState):return {"results": [s.lower() for s in state["inputs"]]}def combine(state: ParallelState):return {"results": [f"A:{a}, B:{b}" for a, b in zip(state["results"], state["results"])]}workflow = StateGraph(ParallelState)
workflow.add_node("upper", task_a)
workflow.add_node("lower", task_b)
workflow.add_node("combine", combine)# 并行执行 A 和 B
workflow.add_edge("upper", "combine")
workflow.add_edge("lower", "combine")
workflow.add_edge("combine", END)
workflow.add_edge(START,'upper')
app = workflow.compile()
result = app.invoke({"inputs": ["Hello", "World"]})
print(result)
# 运行结果:
# {'inputs': ['Hello', 'World'], 'results': ['A:HELLO, B:HELLO', 'A:WORLD, B:WORLD']}
消息传递: 聊天对话流
from typing import Literal, TypedDict, List
from langgraph.graph import StateGraph, START, END
from langgraph.graph import MessageGraph
from langchain_core.messages import HumanMessage, AIMessagedef responder(messages: List[HumanMessage]):last_msg = messages[-1].contentreturn AIMessage(content=f"你说了: {last_msg}")graph = MessageGraph()
graph.add_node("responder", responder)
graph.add_edge("responder", END)
graph.set_entry_point("responder")app = graph.compile()
result = app.invoke(HumanMessage(content="你好!"))
print(result[-1].content)
# 输出: 你说了: 你好!
错误处理:捕获并处理异常
from typing import Literal, TypedDict, List
from langgraph.graph import StateGraph, START, ENDclass ErrorState(TypedDict):input: stroutput: strerror: strdef risky_task(state: ErrorState):try:if "fail" in state["input"]:raise ValueError("模拟错误")return {"output": state["input"].upper()}except Exception as e:return {"error": str(e)}def handle_error(state: ErrorState):return {"output": f"错误处理: {state['error']}"}workflow = StateGraph(ErrorState)
workflow.add_node("task", risky_task)
workflow.add_node("handler", handle_error)def check_error(state: ErrorState):return "handler" if "error" in state else ENDworkflow.add_conditional_edges("task",check_error,{"handler": "handler", "end": END}
)
workflow.add_edge(START,'task')workflow.add_edge("handler", END)app = workflow.compile()
result = app.invoke({"input": "fail"})
print(result["output"])
# 运行结果: 错误处理: 模拟错误
关键点总结
功能 | 核心方法/类 | 典型应用场景 |
线性流程 | add_node(),add_edge() | 简单顺序任务 |
条件分支 | add_conditional_edges() | 动态路由决策 |
循环控制 | 条件边 + 状态判断 | 迭代处理 |
并行执行 | 多入口边 | 独立任务并行 |
工具调用 | ToolNode | 集成外部工具(搜索/API) |
消息传递 | MessageGraph | 多轮对话系统 |
错误处理 | try-catch + 条件边 | 容错工作流 |
构建工作流来调用工具,来访问本地LLM
执行流程: 初始化Ollama 大模型 -> 定义状态类型 -> 定义工具函数 -> 定义节点
-> 构建工作流(添加节点,设置入口点,设置条件路由,设置终止边) -> 编译测试
from langgraph.graph import StateGraph, START, END
from langchain_community.chat_models import ChatOllama
from typing import TypedDict, Literal, Any# 1. 初始化Ollama
llm = ChatOllama(model="qwen", temperature=0.7)# 2. 定义状态类型
class QAState(TypedDict):question: stranswer: strneeds_tool: Literal["yes", "no"]# 3. 定义工具函数
def calculator(expression: str) -> str | Any:"""计算数学表达式"""try:return eval(expression) # 生产环境应使用更安全的方式except:return "计算错误"# 4. 定义节点函数
def decide_tool_use(state: QAState):"""判断是否需要工具"""question = state["question"]needs_tool = "yes" if "计算" in question else "no"return {"needs_tool": needs_tool}def call_tool(state: QAState):"""调用计算工具"""expr = state["question"].split("计算")[1].strip()result = calculator(expr)return {"answer": f"计算结果: {result}"}def generate_directly(state: QAState):"""直接生成回答"""response = llm.invoke(state["question"])return {"answer": response.content}# 5. 构建工作流
workflow = StateGraph(QAState)# 添加节点
workflow.add_node("decide", decide_tool_use)
workflow.add_node("use_tool", call_tool)
workflow.add_node("generate", generate_directly)# 设置入口点(关键修正)
workflow.set_entry_point("decide")
# 替代set_entry_point的方式
# workflow.add_edge(START, "decide") # 明确指定从START到decide节点# 设置条件路由
def route(state: QAState):if state["needs_tool"] == "yes":return "use_tool"return "generate"workflow.add_conditional_edges("decide",route,{"use_tool": "use_tool", "generate": "generate"}
)# 设置终止边
workflow.add_edge("use_tool", END)
workflow.add_edge("generate", END)
# 6. 编译并测试
app = workflow.compile()
# 测试工具调用
result = app.invoke({"question": "请计算3.14 * 5"})
print(result["answer"]) # 输出: 计算结果: 15.7# 测试直接生成
result = app.invoke({"question": "解释量子计算"})
print(result["answer"]) # 输出LLM生成的解释#####################################
运行结果:
计算结果: 15.700000000000001
计算结果: 计算错误
可视化工作流 调试建议
app = workflow.compile()
后面添加get_graph
print(app.get_graph().draw_mermaid())
---
config:flowchart:curve: linear
---
graph TD;__start__([<p>__start__</p>]):::firstdecide(decide)use_tool(use_tool)generate(generate)__end__([<p>__end__</p>]):::last__start__ --> decide;decide -.-> generate;decide -.-> use_tool;generate --> __end__;use_tool --> __end__;classDef default fill:#f2f0ff,line-height:1.2classDef first fill-opacity:0classDef last fill:#bfb6fc
调用工具访问LLM 案例
(当用户需要询问最新新闻或天气,AI大模型不能准确回答需要借助第三方工具来获取信息,这时需要添加条件边来做判断)
当需要用工具时调用第三方工具,不需要时直接回答用户问题
- 使用 ChatOllama 包装本地模型
- 决策节点判断是否需要外部搜索
- 条件路由控制是否调用工具
- 最终由LLM生成回答
from langgraph.graph import StateGraph, END,START
from langchain_community.chat_models import ChatOllama
from langchain_core.prompts import ChatPromptTemplate
from typing import TypedDict, List, Literal# 1. 初始化Ollama模型
llm = ChatOllama(model="qwen", # 使用本地已下载的模型temperature=0.7,base_url="http://localhost:11434" # Ollama默认地址
)# 2. 定义状态结构
class QAState(TypedDict):question: strcontext: stranswer: strneeds_search: Literal["yes", "no"]# 3. 定义工具函数(模拟网络搜索)
def web_search(query: str) -> str:"""模拟网络搜索工具"""print(f"正在搜索: {query}")# 这里实际可以接入SerpAPI/Tavily等真实搜索工具mock_results = {"北京最新的新闻是什么?": "今日AI大会在北京召开...","天气情况": "北京明天晴转多云,25-32℃","技术问题": "LangGraph是LangChain的工作流编排工具..."}return mock_results.get(query, "未找到相关信息")# 4. 定义工作流节点
def decide_need_search(state: QAState):"""判断是否需要搜索"""question = state["question"]# 简单逻辑:问题包含"最新"或"天气"时需要搜索needs_search = "yes" if any(kw in question for kw in ["最新", "天气"]) else "no"return {"needs_search": needs_search}def retrieve_info(state: QAState):"""调用搜索工具获取信息"""search_result = web_search(state["question"])return {"context": search_result}def generate_answer(state: QAState):"""使用LLM生成回答"""prompt = ChatPromptTemplate.from_messages([("system", "你是一个智能助手,请根据以下上下文回答问题。"),("human", "问题: {question}\n上下文: {context}")])chain = prompt | llmresponse = chain.invoke({"question": state["question"],"context": state.get("context", "无额外上下文")})return {"answer": response.content}# 5. 构建工作流
workflow = StateGraph(QAState)# 添加节点
workflow.add_node("decide", decide_need_search)
workflow.add_node("search", retrieve_info)
workflow.add_node("answer", generate_answer)# 设置条件路由
def route_based_on_decision(state: QAState):if state["needs_search"] == "yes":return "search"return "answer"workflow.add_conditional_edges("decide",route_based_on_decision,{"search": "search", "answer": "answer"}
)# 设置边
workflow.add_edge("search", "answer")
workflow.add_edge("answer", END)# 设置入口点
workflow.set_entry_point("decide")# 6. 编译工作流
app = workflow.compile()# 7. 测试不同场景
def run_qa(question):print(f"\n问题: {question}")result = app.invoke({"question": question, "context": ""})print("回答:", result["answer"])# 需要搜索的问题
run_qa("北京最新的新闻是什么?")# 不需要搜索的问题
run_qa("请用中文解释量子计算")# 运行结果:
# 正在搜索: 北京最新的新闻是什么?
# 回答: 北京最新新闻是关于今日在北京召开的AI大会的相关信息。# 问题: 请用中文解释量子计算
# 回答: 量子计算是一种利用量子力学原理... (直接由LLM生成)
多轮对话系统案例
from typing import TypedDict, List
from langchain_community.chat_models import ChatOllama # 更推荐的导入方式
from langgraph.graph import StateGraph,END
from langchain_core.prompts import ChatPromptTemplate # 添加prompt模板# 1. 定义增强的状态结构
class QAState(TypedDict):question: stranswer: strcontext: strhistory: List[dict] # 添加对话历史记录# 2. 初始化模型(带错误处理)
try:llm = ChatOllama(model="qwen",temperature=0.7,base_url="http://localhost:11434" # 显式指定地址)
except Exception as e:raise RuntimeError(f"Ollama初始化失败,请确保服务已运行: {str(e)}")# 3. 使用Prompt模板优化回答质量
prompt_template = ChatPromptTemplate.from_messages([("system", "你是一个专业的HR助手,请根据提供的上下文回答问题。"),("human", "上下文: {context}\n问题: {question}")
])# 4. 优化后的节点函数
def retrieve(state: QAState):"""检索上下文(模拟RAG场景)"""try:# 实际项目中这里应该是向量库检索simulated_context = llm.invoke(f"生成关于'{state['question']}'的3条相关知识要点").contentreturn {"context": simulated_context,"history": state["history"] + [{"type": "retrieve", "content": simulated_context}]}except Exception as e:return {"context": f"检索错误: {str(e)}"}def generate(state: QAState):"""生成最终回答"""try:chain = prompt_template | llmresponse = chain.invoke({"context": state["context"],"question": state["question"]})return {"answer": response.content,"history": state["history"] + [{"type": "answer", "content": response.content}]}except Exception as e:return {"answer": f"生成回答时出错: {str(e)}"}# 5. 构建健壮的工作流
workflow = StateGraph(QAState)# 添加节点(带节点名称校验)
workflow.add_node("retrieve", retrieve)
workflow.add_node("generate", generate)# 设置执行流程
workflow.add_edge("retrieve", "generate")
workflow.set_entry_point("retrieve") # 明确入口
workflow.add_edge("generate", END) # 明确出口# 6. 编译前验证
if not workflow.nodes:raise ValueError("工作流没有添加任何节点")# 7. 带错误处理的应用执行
try:app = workflow.compile()# 测试用例test_cases = [{"question": "如何申请年假?", "context": "", "history": []},{"question": "年假最少可以请几天?", "context": "", "history": []}]for case in test_cases:print(f"\n问题: {case['question']}")result = app.invoke(case)print(f"回答: {result['answer']}")print(f"上下文: {result['context'][:100]}...") # 显示部分上下文print(f"历史记录: {len(result['history'])}条")except Exception as e:print(f"工作流执行失败: {str(e)}")# 运行结果:
"""
问题: 如何申请年假?
回答: 要申请年假,一般需要提前向公司提交年假申请表,并附上相关的证明材料,如工作证、身份证等。
上下文: 1. 在中国,年假是指员工在一年内休假的时间。一般来说,公司会根据员工的工作年限来决定年假的天数。2. 要申请年假,一般需要提前向公司提交年假申请表,并附上相关的证明材料,如工作证、身份证等。...
历史记录: 2条问题: 年假最少可以请几天?
回答: 年假最少可以请7天。
上下文: 1. 年假是员工在一年中享有的带薪休假。在中国,年假最长可以请7天。2. 年假的计算方式因国家和公司而异。在中国,年假的计算通常基于员工的工作年限和每年的年假天数来确定。3. 在中国的劳动法中...
历史记录: 2条"""
LangGraph多代理协作中实现固定循环次数后自动停止
用户提问 → 监督代理路由 → 专业代理回答 → 计数器+1
from typing import TypedDict, Annotated, Sequence
from langgraph.graph import StateGraph, END
from langchain_community.chat_models import ChatOllama
from langchain_core.messages import HumanMessage, BaseMessage
import operator# 1. 扩展状态结构(增加循环计数器)
class AgentState(TypedDict):messages: Annotated[Sequence[BaseMessage], operator.add] # 消息历史next: str # 下一节点标记loop_count: int # 新增:循环计数器# 2. 初始化Ollama模型
llm = ChatOllama(model="qwen")# 3. 代理函数(增加计数器更新)
def tech_agent(state: AgentState) -> dict:response = llm.invoke("技术专家回答:" + state["messages"][-1].content)return {"messages": [response],"next": "supervisor","loop_count": state["loop_count"] + 1 # 计数器+1}def life_agent(state: AgentState) -> dict:response = llm.invoke("生活顾问回答:" + state["messages"][-1].content)return {"messages": [response],"next": "supervisor","loop_count": state["loop_count"] + 1}# 4. 监督代理(检查循环次数)
def supervisor(state: AgentState) -> dict:# 如果达到3次循环则终止if state["loop_count"] >= 3:return {"next": "FINISH", "loop_count": state["loop_count"]}# 正常路由逻辑last_msg = state["messages"][-1].content.lower()if any(k in last_msg for k in ["python", "代码"]):return {"next": "tech_agent", "loop_count": state["loop_count"]}elif any(k in last_msg for k in ["饮食", "健康"]):return {"next": "life_agent", "loop_count": state["loop_count"]}else:return {"next": "FINISH", "loop_count": state["loop_count"]}# 5. 构建工作流
workflow = StateGraph(AgentState)
workflow.add_node("tech_agent", tech_agent)
workflow.add_node("life_agent", life_agent)
workflow.add_node("supervisor", supervisor)workflow.add_edge("tech_agent", "supervisor")
workflow.add_edge("life_agent", "supervisor")
workflow.add_conditional_edges("supervisor",lambda state: state["next"],{"tech_agent": "tech_agent","life_agent": "life_agent","FINISH": END}
)
workflow.set_entry_point("supervisor")# 6. 编译运行(初始化计数器为0)
app = workflow.compile()# 测试(会自动在3次循环后停止)
result = app.invoke({"messages": [HumanMessage(content="Python的GIL是什么?")],"next": "supervisor","loop_count": 0 # 初始计数器
})print("最终结果:", result["messages"][-1].content)
print("循环次数:", result["loop_count"])