当前位置: 首页 > news >正文

Langgraph 学习教程

目录

LangGraph 的核心作用

Langgraph常见示例场景:

基础工作流构建:线性工作流  顺序执行

条件分支: 根据状态选择动态路由

循环控制,固定次数循环

并行处理: 多节点并行处理

消息传递: 聊天对话流

错误处理:捕获并处理异常

关键点总结

构建工作流来调用工具,来访问本地LLM

可视化工作流  调试建议

调用工具访问LLM 案例

多轮对话系统案例

LangGraph多代理协作中实现固定循环次数后自动停止


LangGraph 的核心作用

LangGraph 是 LangChain 生态系统中的一个库,专门用于构建有状态、多参与者的AI工作流。它的核心价值在于:

功能

说明

有状态工作流

支持长时间运行的对话或任务,保持中间状态(如多轮对话的上下文)

多参与者协作

允许多个AI代理、工具或人类用户协同工作

循环与分支控制

实现复杂逻辑(如条件判断、循环迭代)

可视化调试

提供工作流可视化工具,便于开发和故障排查

与普通LangChain的关键区别

特性

LangChain Chain

LangGraph

状态管理

无状态(单次执行)

有状态(跨步骤持久化)

复杂性

适合线性流程

适合带分支/循环的复杂流程

多参与者支持

需手动协调

原生支持多代理协作

典型应用

简单QA、检索增强

复杂决策、多轮交互

LangGraph 特别适合需要长期记忆或多角色协作的AI应用,是企业级复杂工作流编排的理想选择。对于简单任务,传统的LangChain Chain仍是更轻量级的方案。

Langgraph常见示例场景:

基础工作流构建:线性工作流  顺序执行
from langgraph.graph import StateGraph, END, START
from typing import TypedDict# 定义状态类型
class State(TypedDict):input: stroutput: str# 定义节点函数
def node1(state: State):return {"output": state["input"].upper()}def node2(state: State):return {"output": state["output"] + "!"}# 构建工作流
workflow = StateGraph(State)
workflow.add_node("uppercase", node1)
workflow.add_node("add_exclamation", node2)# 设置执行顺序
workflow.add_edge("uppercase", "add_exclamation")
workflow.add_edge("add_exclamation", END)workflow.add_edge(START, "uppercase")
# 编译并运行
app = workflow.compile()
result = app.invoke({"input": "hello"})
print(result["output"])# 运行结果: HELLO!
条件分支: 根据状态选择动态路由
from typing import Literal, TypedDict
from langgraph.graph import StateGraph, START, ENDclass BranchState(TypedDict):value: intpath: Literal["even", "odd"]def process_number(state: BranchState):return {"value": state["value"] * 2}def even_path(state: BranchState):return {"path": "even"}def odd_path(state: BranchState):return {"path": "odd"}workflow = StateGraph(BranchState)
workflow.add_node("process", process_number)
workflow.add_node("even", even_path)
workflow.add_node("odd", odd_path)# 条件分支函数
def decide_path(state: BranchState):return "even" if state["value"] % 2 == 0 else "odd"workflow.add_conditional_edges("process",decide_path,{"even": "even", "odd": "odd"}  # 分支映射
)
workflow.add_edge("even", END)
workflow.add_edge("odd", END)
workflow.add_edge(START,'process')app = workflow.compile()
# 测试
print( app.invoke({"value": 3}))
print( app.invoke({"value": 3.1}))# 输出:
# {"value": 6, "path": "even"}
# {'value': 6.2, 'path': 'odd'}
循环控制,固定次数循环
from typing import Literal, TypedDict
from langgraph.graph import StateGraph, START, ENDclass LoopState(TypedDict):counter: intmax_count: intdef increment(state: LoopState):return {"counter": state["counter"] + 1}def should_continue(state: LoopState):return "continue" if state["counter"] < state["max_count"] else "end"workflow = StateGraph(LoopState)
workflow.add_node("increment", increment)# 循环条件
workflow.add_conditional_edges("increment",should_continue,{"continue": "increment", "end": END}
)
workflow.add_edge(START, 'increment')
app = workflow.compile()
result = app.invoke({"counter": 0, "max_count": 3})
print(result["counter"])  
# 输出: 3
并行处理: 多节点并行处理
from typing import Literal, TypedDict, List
from langgraph.graph import StateGraph, START, ENDclass ParallelState(TypedDict):inputs: List[str]results: List[str]def task_a(state: ParallelState):return {"results": [s.upper() for s in state["inputs"]]}def task_b(state: ParallelState):return {"results": [s.lower() for s in state["inputs"]]}def combine(state: ParallelState):return {"results": [f"A:{a}, B:{b}" for a, b in zip(state["results"], state["results"])]}workflow = StateGraph(ParallelState)
workflow.add_node("upper", task_a)
workflow.add_node("lower", task_b)
workflow.add_node("combine", combine)# 并行执行 A 和 B
workflow.add_edge("upper", "combine")
workflow.add_edge("lower", "combine")
workflow.add_edge("combine", END)
workflow.add_edge(START,'upper')
app = workflow.compile()
result = app.invoke({"inputs": ["Hello", "World"]})
print(result)
# 运行结果:
# {'inputs': ['Hello', 'World'], 'results': ['A:HELLO, B:HELLO', 'A:WORLD, B:WORLD']}
消息传递: 聊天对话流
from typing import Literal, TypedDict, List
from langgraph.graph import StateGraph, START, END
from langgraph.graph import MessageGraph
from langchain_core.messages import HumanMessage, AIMessagedef responder(messages: List[HumanMessage]):last_msg = messages[-1].contentreturn AIMessage(content=f"你说了: {last_msg}")graph = MessageGraph()
graph.add_node("responder", responder)
graph.add_edge("responder", END)
graph.set_entry_point("responder")app = graph.compile()
result = app.invoke(HumanMessage(content="你好!"))
print(result[-1].content)
# 输出: 你说了: 你好!
错误处理:捕获并处理异常
from typing import Literal, TypedDict, List
from langgraph.graph import StateGraph, START, ENDclass ErrorState(TypedDict):input: stroutput: strerror: strdef risky_task(state: ErrorState):try:if "fail" in state["input"]:raise ValueError("模拟错误")return {"output": state["input"].upper()}except Exception as e:return {"error": str(e)}def handle_error(state: ErrorState):return {"output": f"错误处理: {state['error']}"}workflow = StateGraph(ErrorState)
workflow.add_node("task", risky_task)
workflow.add_node("handler", handle_error)def check_error(state: ErrorState):return "handler" if "error" in state else ENDworkflow.add_conditional_edges("task",check_error,{"handler": "handler", "end": END}
)
workflow.add_edge(START,'task')workflow.add_edge("handler", END)app = workflow.compile()
result = app.invoke({"input": "fail"})
print(result["output"])
# 运行结果: 错误处理: 模拟错误
关键点总结

功能

核心方法/类

典型应用场景

线性流程

add_node(),add_edge()

简单顺序任务

条件分支

add_conditional_edges()

动态路由决策

循环控制

条件边 + 状态判断

迭代处理

并行执行

多入口边

独立任务并行

工具调用

ToolNode

集成外部工具(搜索/API)

消息传递

MessageGraph

多轮对话系统

错误处理

try-catch + 条件边

容错工作流

 

构建工作流来调用工具,来访问本地LLM

执行流程: 初始化Ollama 大模型 -> 定义状态类型 -> 定义工具函数 -> 定义节点

-> 构建工作流(添加节点,设置入口点,设置条件路由,设置终止边) -> 编译测试

from langgraph.graph import StateGraph, START, END
from langchain_community.chat_models import ChatOllama
from typing import TypedDict, Literal, Any# 1. 初始化Ollama
llm = ChatOllama(model="qwen", temperature=0.7)# 2. 定义状态类型
class QAState(TypedDict):question: stranswer: strneeds_tool: Literal["yes", "no"]# 3. 定义工具函数
def calculator(expression: str) -> str | Any:"""计算数学表达式"""try:return eval(expression)  # 生产环境应使用更安全的方式except:return "计算错误"# 4. 定义节点函数
def decide_tool_use(state: QAState):"""判断是否需要工具"""question = state["question"]needs_tool = "yes" if "计算" in question else "no"return {"needs_tool": needs_tool}def call_tool(state: QAState):"""调用计算工具"""expr = state["question"].split("计算")[1].strip()result = calculator(expr)return {"answer": f"计算结果: {result}"}def generate_directly(state: QAState):"""直接生成回答"""response = llm.invoke(state["question"])return {"answer": response.content}# 5. 构建工作流
workflow = StateGraph(QAState)# 添加节点
workflow.add_node("decide", decide_tool_use)
workflow.add_node("use_tool", call_tool)
workflow.add_node("generate", generate_directly)# 设置入口点(关键修正)
workflow.set_entry_point("decide")
# 替代set_entry_point的方式
# workflow.add_edge(START, "decide")  # 明确指定从START到decide节点# 设置条件路由
def route(state: QAState):if state["needs_tool"] == "yes":return "use_tool"return "generate"workflow.add_conditional_edges("decide",route,{"use_tool": "use_tool", "generate": "generate"}
)# 设置终止边
workflow.add_edge("use_tool", END)
workflow.add_edge("generate", END)
# 6. 编译并测试
app = workflow.compile()
# 测试工具调用
result = app.invoke({"question": "请计算3.14 * 5"})
print(result["answer"])  # 输出: 计算结果: 15.7# 测试直接生成
result = app.invoke({"question": "解释量子计算"})
print(result["answer"])  # 输出LLM生成的解释#####################################
运行结果:
计算结果: 15.700000000000001
计算结果: 计算错误
可视化工作流  调试建议
app = workflow.compile()
后面添加get_graph
print(app.get_graph().draw_mermaid())
---
config:flowchart:curve: linear
---
graph TD;__start__([<p>__start__</p>]):::firstdecide(decide)use_tool(use_tool)generate(generate)__end__([<p>__end__</p>]):::last__start__ --> decide;decide -.-> generate;decide -.-> use_tool;generate --> __end__;use_tool --> __end__;classDef default fill:#f2f0ff,line-height:1.2classDef first fill-opacity:0classDef last fill:#bfb6fc
调用工具访问LLM 案例

(当用户需要询问最新新闻或天气,AI大模型不能准确回答需要借助第三方工具来获取信息,这时需要添加条件边来做判断)

当需要用工具时调用第三方工具,不需要时直接回答用户问题

  1. 使用 ChatOllama 包装本地模型
  2. 决策节点判断是否需要外部搜索
  3. 条件路由控制是否调用工具
  4. 最终由LLM生成回答
from langgraph.graph import StateGraph, END,START
from langchain_community.chat_models import ChatOllama
from langchain_core.prompts import ChatPromptTemplate
from typing import TypedDict, List, Literal# 1. 初始化Ollama模型
llm = ChatOllama(model="qwen",  # 使用本地已下载的模型temperature=0.7,base_url="http://localhost:11434"  # Ollama默认地址
)# 2. 定义状态结构
class QAState(TypedDict):question: strcontext: stranswer: strneeds_search: Literal["yes", "no"]# 3. 定义工具函数(模拟网络搜索)
def web_search(query: str) -> str:"""模拟网络搜索工具"""print(f"正在搜索: {query}")# 这里实际可以接入SerpAPI/Tavily等真实搜索工具mock_results = {"北京最新的新闻是什么?": "今日AI大会在北京召开...","天气情况": "北京明天晴转多云,25-32℃","技术问题": "LangGraph是LangChain的工作流编排工具..."}return mock_results.get(query, "未找到相关信息")# 4. 定义工作流节点
def decide_need_search(state: QAState):"""判断是否需要搜索"""question = state["question"]# 简单逻辑:问题包含"最新"或"天气"时需要搜索needs_search = "yes" if any(kw in question for kw in ["最新", "天气"]) else "no"return {"needs_search": needs_search}def retrieve_info(state: QAState):"""调用搜索工具获取信息"""search_result = web_search(state["question"])return {"context": search_result}def generate_answer(state: QAState):"""使用LLM生成回答"""prompt = ChatPromptTemplate.from_messages([("system", "你是一个智能助手,请根据以下上下文回答问题。"),("human", "问题: {question}\n上下文: {context}")])chain = prompt | llmresponse = chain.invoke({"question": state["question"],"context": state.get("context", "无额外上下文")})return {"answer": response.content}# 5. 构建工作流
workflow = StateGraph(QAState)# 添加节点
workflow.add_node("decide", decide_need_search)
workflow.add_node("search", retrieve_info)
workflow.add_node("answer", generate_answer)# 设置条件路由
def route_based_on_decision(state: QAState):if state["needs_search"] == "yes":return "search"return "answer"workflow.add_conditional_edges("decide",route_based_on_decision,{"search": "search", "answer": "answer"}
)# 设置边
workflow.add_edge("search", "answer")
workflow.add_edge("answer", END)# 设置入口点
workflow.set_entry_point("decide")# 6. 编译工作流
app = workflow.compile()# 7. 测试不同场景
def run_qa(question):print(f"\n问题: {question}")result = app.invoke({"question": question, "context": ""})print("回答:", result["answer"])# 需要搜索的问题
run_qa("北京最新的新闻是什么?")# 不需要搜索的问题
run_qa("请用中文解释量子计算")# 运行结果:
# 正在搜索: 北京最新的新闻是什么?
# 回答: 北京最新新闻是关于今日在北京召开的AI大会的相关信息。# 问题: 请用中文解释量子计算
# 回答: 量子计算是一种利用量子力学原理... (直接由LLM生成)
多轮对话系统案例
from typing import TypedDict, List
from langchain_community.chat_models import ChatOllama  # 更推荐的导入方式
from langgraph.graph import StateGraph,END
from langchain_core.prompts import ChatPromptTemplate  # 添加prompt模板# 1. 定义增强的状态结构
class QAState(TypedDict):question: stranswer: strcontext: strhistory: List[dict]  # 添加对话历史记录# 2. 初始化模型(带错误处理)
try:llm = ChatOllama(model="qwen",temperature=0.7,base_url="http://localhost:11434"  # 显式指定地址)
except Exception as e:raise RuntimeError(f"Ollama初始化失败,请确保服务已运行: {str(e)}")# 3. 使用Prompt模板优化回答质量
prompt_template = ChatPromptTemplate.from_messages([("system", "你是一个专业的HR助手,请根据提供的上下文回答问题。"),("human", "上下文: {context}\n问题: {question}")
])# 4. 优化后的节点函数
def retrieve(state: QAState):"""检索上下文(模拟RAG场景)"""try:# 实际项目中这里应该是向量库检索simulated_context = llm.invoke(f"生成关于'{state['question']}'的3条相关知识要点").contentreturn {"context": simulated_context,"history": state["history"] + [{"type": "retrieve", "content": simulated_context}]}except Exception as e:return {"context": f"检索错误: {str(e)}"}def generate(state: QAState):"""生成最终回答"""try:chain = prompt_template | llmresponse = chain.invoke({"context": state["context"],"question": state["question"]})return {"answer": response.content,"history": state["history"] + [{"type": "answer", "content": response.content}]}except Exception as e:return {"answer": f"生成回答时出错: {str(e)}"}# 5. 构建健壮的工作流
workflow = StateGraph(QAState)# 添加节点(带节点名称校验)
workflow.add_node("retrieve", retrieve)
workflow.add_node("generate", generate)# 设置执行流程
workflow.add_edge("retrieve", "generate")
workflow.set_entry_point("retrieve")  # 明确入口
workflow.add_edge("generate", END)  # 明确出口# 6. 编译前验证
if not workflow.nodes:raise ValueError("工作流没有添加任何节点")# 7. 带错误处理的应用执行
try:app = workflow.compile()# 测试用例test_cases = [{"question": "如何申请年假?", "context": "", "history": []},{"question": "年假最少可以请几天?", "context": "", "history": []}]for case in test_cases:print(f"\n问题: {case['question']}")result = app.invoke(case)print(f"回答: {result['answer']}")print(f"上下文: {result['context'][:100]}...")  # 显示部分上下文print(f"历史记录: {len(result['history'])}条")except Exception as e:print(f"工作流执行失败: {str(e)}")# 运行结果:
"""
问题: 如何申请年假?
回答: 要申请年假,一般需要提前向公司提交年假申请表,并附上相关的证明材料,如工作证、身份证等。
上下文: 1. 在中国,年假是指员工在一年内休假的时间。一般来说,公司会根据员工的工作年限来决定年假的天数。2. 要申请年假,一般需要提前向公司提交年假申请表,并附上相关的证明材料,如工作证、身份证等。...
历史记录: 2条问题: 年假最少可以请几天?
回答: 年假最少可以请7天。
上下文: 1. 年假是员工在一年中享有的带薪休假。在中国,年假最长可以请7天。2. 年假的计算方式因国家和公司而异。在中国,年假的计算通常基于员工的工作年限和每年的年假天数来确定。3. 在中国的劳动法中...
历史记录: 2条"""
LangGraph多代理协作中实现固定循环次数后自动停止

用户提问 → 监督代理路由 → 专业代理回答 → 计数器+1

from typing import TypedDict, Annotated, Sequence
from langgraph.graph import StateGraph, END
from langchain_community.chat_models import ChatOllama
from langchain_core.messages import HumanMessage, BaseMessage
import operator# 1. 扩展状态结构(增加循环计数器)
class AgentState(TypedDict):messages: Annotated[Sequence[BaseMessage], operator.add]  # 消息历史next: str  # 下一节点标记loop_count: int  # 新增:循环计数器# 2. 初始化Ollama模型
llm = ChatOllama(model="qwen")# 3. 代理函数(增加计数器更新)
def tech_agent(state: AgentState) -> dict:response = llm.invoke("技术专家回答:" + state["messages"][-1].content)return {"messages": [response],"next": "supervisor","loop_count": state["loop_count"] + 1  # 计数器+1}def life_agent(state: AgentState) -> dict:response = llm.invoke("生活顾问回答:" + state["messages"][-1].content)return {"messages": [response],"next": "supervisor","loop_count": state["loop_count"] + 1}# 4. 监督代理(检查循环次数)
def supervisor(state: AgentState) -> dict:# 如果达到3次循环则终止if state["loop_count"] >= 3:return {"next": "FINISH", "loop_count": state["loop_count"]}# 正常路由逻辑last_msg = state["messages"][-1].content.lower()if any(k in last_msg for k in ["python", "代码"]):return {"next": "tech_agent", "loop_count": state["loop_count"]}elif any(k in last_msg for k in ["饮食", "健康"]):return {"next": "life_agent", "loop_count": state["loop_count"]}else:return {"next": "FINISH", "loop_count": state["loop_count"]}# 5. 构建工作流
workflow = StateGraph(AgentState)
workflow.add_node("tech_agent", tech_agent)
workflow.add_node("life_agent", life_agent)
workflow.add_node("supervisor", supervisor)workflow.add_edge("tech_agent", "supervisor")
workflow.add_edge("life_agent", "supervisor")
workflow.add_conditional_edges("supervisor",lambda state: state["next"],{"tech_agent": "tech_agent","life_agent": "life_agent","FINISH": END}
)
workflow.set_entry_point("supervisor")# 6. 编译运行(初始化计数器为0)
app = workflow.compile()# 测试(会自动在3次循环后停止)
result = app.invoke({"messages": [HumanMessage(content="Python的GIL是什么?")],"next": "supervisor","loop_count": 0  # 初始计数器
})print("最终结果:", result["messages"][-1].content)
print("循环次数:", result["loop_count"])

http://www.lryc.cn/news/578603.html

相关文章:

  • 位运算经典题解
  • python+uniapp基于微信小程序的流浪动物救助领养系统nodejs+java
  • 用 YOLOv8 + DeepSORT 实现目标检测、追踪与速度估算
  • SeaTunnel 社区 2 项目中选“开源之夏 2025”,探索高阶数据集成能力!
  • 华为设备 QoS 流分类与流标记深度解析及实验脚本
  • flv.js视频/直播流测试demo
  • 欢乐熊大话蓝牙知识24:LE Secure Connections 是 BLE 的安全升级术
  • 视频内存太大怎么压缩变小一点?视频压缩的常用方法
  • Nginx重定向协议冲突解决方案:The plain HTTP request was sent to HTTPS port
  • Apache HTTP Server部署全攻略
  • 第八十六篇 大数据排序算法:从厨房整理到分布式排序的智慧
  • DBA 命令全面指南:核心操作、语法与最佳实践
  • 爱回收平台接口开发指南
  • 变幻莫测:CoreData 中 Transformable 类型面面俱到(七)
  • 打造 AI 产品的前端架构:响应式、流式、智能交互三合一
  • 基于SSM万华城市货运服务系统的设计与实现
  • OpenCV CUDA模块设备层-----反向二值化阈值处理函数thresh_binary_inv_func()
  • Python学习Day48
  • golang generic 2022-04-13
  • 技术学习_人工智能_1_神经网络是如何实现的?
  • IDE全家桶专用快捷键----------个人独家分享!!
  • 02.SpringBoot常用Utils工具类详解
  • pytorch学习—7.处理多维特征的输入
  • 通达信【极弱强势指标与股道波段交易系统】幅图
  • 【学习笔记】Python中主函数调用的方式
  • 修改Spatial-MLLM项目,使其专注于无人机航拍视频的空间理解
  • Electron 应用中的内容安全策略 (CSP) 全面指南
  • AbMole| H₂DCFDA(M9096;活性氧(ROS)探针)
  • 医学编码:临床试验数据标准化的关键
  • Next.js 安装使用教程