AI智能体——OpenManus 源码学习
1、OpenManus 整体架构
1.1、agent 目录
agent 目录是 OpenManus 实现的核心,采用了分层的代理架构,不同层次的代理负责不同的功能,这样更利于系统的扩展。
OpenManus 的代理架构主要包含以下几层:
- BaseAgent:最基础的代理抽象类,定义了所有代理的基本状态管理和执行循环
- ReActAgent:实现 ReAct 模式的代理,具有思考(Think)和行动(Act)两个主要步骤
- ToolCallAgent:能够调用工具的代理,继承自 ReActAgent 并扩展了工具调用能力
- Manus:具体实现的智能体实例,集成了所有能力并添加了更多专业工具
1.2、tool 目录
tool 目录定义了各种各样的工具,比如网页搜索、文件操作、询求用户帮助、代码执行器等等。
1.3、prompt 目录
prompt 目录定义了整个项目中可能会用到的提示词。
1.4、其他支持
为了实现完整的智能体功能,OpenManus 依赖以下关键组件:
- 记忆系统:使用 Memory 类存储对话历史和中间状态
- LLM 大模型:通过 LLM 类提供思考和决策能力
- 工具系统:提供 BaseTool 和 ToolCollection 类扩展智能体的能力边界
- 流程控制:通过 AgentState 和执行循环管理状态转换和任务流程
2、AI 智能体核心实现
2.1、BaseAgent
BaseAgent 是所有代理的基础,定义了代理状态管理和执行循环的核心逻辑。其关键代码就是 Agent Loop 的实现,通过 while 实现循环,并且定义了死循环检查机制:
class BaseAgent(BaseModel, ABC): async def run(self, request: Optional[str] = None) -> str: """执行代理的主循环""" if self.state != AgentState.IDLE: raise RuntimeError(f"Cannot run agent from state: {self.state}") if request: self.update_memory("user", request) results: List[str] = [] async with self.state_context(AgentState.RUNNING): while (self.current_step < self.max_steps and self.state != AgentState.FINISHED): self.current_step += 1 step_result = await self.step() # 检查是否陷入循环 if self.is_stuck(): self.handle_stuck_state() results.append(f"Step {self.current_step}: {step_result}") if self.current_step >= self.max_steps: self.current_step = 0 self.state = AgentState.IDLE results.append(f"Terminated: Reached max steps ({self.max_steps})") return "\n".join(results) if results else "No steps executed" @abstractmethod async def step(self) -> str: """执行单步操作,必须由子类实现"""
分析:
BaseAgent 类 通过 run() 方法为子类提供了一个标准的代理执行框架。该方法保证了:
- 只有在代理处于空闲状态时才会执行任务。
- 代理执行过程中每一步会被记录。
- 如果在执行过程中代理卡住,会进行处理。
- 最多执行 max_steps 步,并在达到限制时终止。
子类 必须实现 step() 方法,定义具体的单步操作,确保代理在每一步执行时的具体行为。
这里使用了模板方法设计模式,父类定义执行流程,具体的执行方法(step)交给子类实现。
2.2、ReActAgent
ReActAgent 实现了 ReAct 模式,将代理的执行过程分为思考(Think)和行动(Act)两个关键步骤。查看 react.py 文件:
class ReActAgent(BaseAgent, ABC): @abstractmethod async def think(self) -> bool: """处理当前状态并决定下一步行动""" @abstractmethod async def act(self) -> str: """执行决定的行动""" async def step(self) -> str: """执行单步:思考和行动""" should_act = await self.think() if not should_act: return "Thinking complete - no action needed" return await self.act()
上述代码同样运用了模板方法设计模式,这种设计体现了 ReAct 模式的核心思想,也就是 “思考 - 行动 - 观察” 的循环过程。但是具体怎么思考、怎么行动,交给子类去实现。
2.3、ToolCallAgent
ToolCallAgent 在 ReAct 模式的基础上增加了工具调用能力,是 OpenManus 最重要的一个层次。查看 toolcall.py 文件,虽然代码比较复杂,但原理很简单,就是工具调用机制的具体实现:
- think:和 AI 交互思考使用什么工具
- act:程序执行工具
- observe:将结果返回给 AI
class ToolCallAgent(ReActAgent): """能够执行工具调用的代理类""" available_tools: ToolCollection = ToolCollection( CreateChatCompletion(), Terminate() ) tool_choices: TOOL_CHOICE_TYPE = ToolChoice.AUTO special_tool_names: List[str] = Field(default_factory=lambda: [Terminate().name]) async def think(self) -> bool: """处理当前状态并使用工具决定下一步行动""" # 添加下一步提示到用户消息 if self.next_step_prompt: user_msg = Message.user_message(self.next_step_prompt) self.messages += [user_msg] # 请求 LLM 选择工具 response = await self.llm.ask_tool( messages=self.messages, system_msgs=([Message.system_message(self.system_prompt)] if self.system_prompt else None), tools=self.available_tools.to_params(), tool_choice=self.tool_choices, ) # 处理工具调用 self.tool_calls = tool_calls = ( response.tool_calls if response and response.tool_calls else [] ) content = response.content if response and response.content else "" # 添加助手消息到记忆 assistant_msg = ( Message.from_tool_calls(content=content, tool_calls=self.tool_calls) if self.tool_calls else Message.assistant_message(content) ) self.memory.add_message(assistant_msg) # 决定是否应该执行行动 return bool(self.tool_calls or content) async def act(self) -> str: """执行工具调用并处理结果""" if not self.tool_calls: # 返回最后一条消息内容,如果没有工具调用 return self.messages[-1].content or "No content or commands to execute" results = [] for command in self.tool_calls: # 执行工具 result = await self.execute_tool(command) # 记录工具响应到记忆 tool_msg = Message.tool_message( content=result, tool_call_id=command.id, name=command.function.name, base64_image=self._current_base64_image, ) self.memory.add_message(tool_msg) results.append(result) return "\n\n".join(results)
总结:
ToolCallAgent 是一个继承自 ReActAgent 的类,它能够执行工具调用并处理结果。它通过 think() 方法决定是否需要调用工具,并通过 act() 方法执行工具并将结果记录到记忆中。主要用于需要根据不同情境选择并执行工具的场景。
2.4、Manus
Manus 类是 OpenManus 的核心智能体实例,集成了各种工具和能力。
class Manus(ToolCallAgent): """多功能通用智能体,支持本地和 MCP 工具""" name: str = "Manus" description: str = "A versatile agent that can solve various tasks using multiple tools" # 添加各种通用工具到工具集合 available_tools: ToolCollection = Field( default_factory=lambda: ToolCollection( PythonExecute(), BrowserUseTool(), StrReplaceEditor(), AskHuman(), Terminate(), ) )
2.5、总结
1、BaseAgent:最基础的智能体行为框架
功能:提供通用的运行控制逻辑(run())这是所有智能体的基类,核心功能是:
- 控制生命周期状态(AgentState.IDLE -> RUNNING -> FINISHED)
- 控制最大步数(max_steps)避免无限循环
- 执行主循环 run(),里面逐步执行 step()
- 提供一个抽象方法 step(),由子类实现
2、 ReActAgent:具备“思考+行动”能力的抽象智能体
功能:实现 step = think + act 模型(React Pattern)
这个类是对 BaseAgent 的扩展,引入了智能体经典的两个阶段:
- think():思考当前情况,决定是否需要执行行动(返回布尔值)
- act():真正执行行动的逻辑
3、ToolCallAgent:可以自动调用工具的智能体
功能:集成 LLM 选择工具 + 执行工具 + 记录工具调用结果
它继承自 ReActAgent,重点实现了:
think() 阶段:
- 构造用户提示、系统提示等消息上下文
- 调用 llm.ask_tool() 请求模型选择工具
- 如果返回了工具调用或纯内容,则存入记忆(memory)
act() 阶段:
- 遍历工具调用列表(tool_calls)
- 用 execute_tool() 执行每个调用
- 结果打包为 tool_message,加入记忆
- 返回所有工具执行结果
4、Manus:一个具体的、强能力 Agent 实例
功能:组合多个强大的工具,比如浏览器、Python 执行器、编辑器等
它本质上就是 ToolCallAgent 的一个“实例化 + 工具配置”版本,适用于多用途智能体:
总体架构总结:
classDiagram
BaseAgent <|-- ReActAgent
ReActAgent <|-- ToolCallAgent
ToolCallAgent <|-- ManusBaseAgent : +run()
BaseAgent : +step() [abstract]ReActAgent : +think() [abstract]
ReActAgent : +act() [abstract]
ReActAgent : +step() = think + actToolCallAgent : +think() = 调用LLM决策工具
ToolCallAgent : +act() =
调用工具并记录结果Manus : available_tools = PythonExecute, BrowserUse, …
3、关键实现细节
3.1、工具系统设计
3.1.1、工具抽象层 BaseTool
所有工具均继承自 BaseTool 抽象基类,提供统一的接口和行为:
class BaseTool(ABC, BaseModel): name: str description: str parameters: Optional[dict] = None async def __call__(self, **kwargs) -> Any: """使用给定参数执行工具""" return await self.execute(**kwargs) @abstractmethod async def execute(self, **kwargs) -> Any: """执行工具的具体逻辑,由子类实现""" def to_param(self) -> Dict: """将工具转换为函数调用格式""" return { "type": "function", "function": { "name": self.name, "description": self.description, "parameters": self.parameters, }, }
这种设计使得每个工具都有统一的调用方式,同时具有规范化的参数描述,便于 LLM 理解工具的使用方法。
3.1.2、终止工具 Terminate
Terminate 工具是一个特殊的工具,允许智能体通过 AI 大模型自主决定何时结束任务,避免无限循环或者过早结束。
class Terminate(BaseTool): name: str = "terminate" description: str = """Terminate the interaction when the request is met OR if the assistant cannot proceed further with the task. When you have finished all the tasks, call this tool to end the work.""" parameters: dict = { "type": "object", "properties": { "status": { "type": "string", "description": "The finish status of the interaction.", "enum": ["success", "failure"], } }, "required": ["status"], } async def execute(self, status: str) -> str: """完成当前执行""" return f"The interaction has been completed with status: {status}"
3.1.3、询问工具 AskHumanm
AskHuman 工具允许智能体在遇到无法自主解决的问题时向人类寻求帮助,也就是给用户一个输入框,让我们能够更好地干预智能体完成任务的过程。
class AskHuman(BaseTool): """Add a tool to ask human for help.""" name: str = "ask_human" description: str = "Use this tool to ask human for help." parameters: str = { "type": "object", "properties": { "inquire": { "type": "string", "description": "The question you want to ask human.", } }, "required": ["inquire"], } async def execute(self, inquire: str) -> str: return input(f"""Bot: {inquire}\n\nYou: """).strip()
这个工具实现虽然简单,但极大地提升了智能体的实用性和安全性。
3.1.4、工具集合 ToolCollection
OpenManus 设计了 ToolCollection 类来管理多个工具实例,提供统一的工具注册和执行接口:
class ToolCollection: """A collection of defined tools.""" def __init__(self, *tools: BaseTool): self.tools = tools self.tool_map = {tool.name: tool for tool in tools} def to_params(self) -> List[Dict[str, Any]]: return [tool.to_param() for tool in self.tools] async def execute(self, *, name: str, tool_input: Dict[str, Any] = None) -> ToolResult: tool = self.tool_map.get(name) if not tool: return ToolFailure(error=f"Tool {name} is invalid") try: result = await tool(**tool_input) return result except ToolError as e: return ToolFailure(error=e.message) def add_tools(self, *tools: BaseTool): """Add multiple tools to the collection.""" for tool in tools: self.add_tool(tool) return self
这种设计使得 OpenManus 可以灵活地添加、移除和管理工具,实现了工具系统的可插拔性。
3.2、MCP 协议支持
3.2.1、MCP 与工具系统的集成
OpenManus 的实现也是遵循了这一思想。通过 MCPClients 类(继承自 ToolCollection)将 MCP 服务集成到现有工具系统中。
class MCPClients(ToolCollection): """ A collection of tools that connects to multiple MCP servers and manages available tools through the Model Context Protocol. """ sessions: Dict[str, ClientSession] = {} exit_stacks: Dict[str, AsyncExitStack] = {} description: str = "MCP client tools for server interaction" def __init__(self): super().__init__() # Initialize with empty tools list self.name = "mcp" # Keep name for backward compatibility
3.2.2、动态工具代理
每当连接到 MCP 服务器时,OpenManus 会动态创建 MCPClientTool 实例(继承自 BaseTool)作为每个远程工具的代理。
class MCPClientTool(BaseTool): """Represents a tool proxy that can be called on the MCP server from the client side.""" session: Optional[ClientSession] = None server_id: str = "" # Add server identifier original_name: str = "" async def execute(self, **kwargs) -> ToolResult: """Execute the tool by making a remote call to the MCP server.""" if not self.session: return ToolResult(error="Not connected to MCP server") try: result = await self.session.call_tool(self.original_name, kwargs) content_str = ", ".join( item.text for item in result.content if isinstance(item, TextContent) ) return ToolResult(output=content_str or "No output returned.") except Exception as e: return ToolResult(error=f"Error executing tool: {str(e)}")
3.2.3、Manus 中的 MCP 集成机制
Manus 智能体 通过工具调用 实现了与 MCP 服务器的无缝集成
async def connect_mcp_server( self, server_url: str, server_id: str = "", use_stdio: bool = False, stdio_args: List[str] = None,
) -> None: """Connect to an MCP server and add its tools.""" if use_stdio: await self.mcp_clients.connect_stdio(server_url, stdio_args or [], server_id) else: await self.mcp_clients.connect_sse(server_url, server_id) # 关键实现:动态添加该服务器的工具到可用工具集合 new_tools = [ tool for tool in self.mcp_clients.tools if tool.server_id == server_id ] self.available_tools.add_tools(*new_tools)
4、其他值得学习的源码
4.1、Python 代码执行沙箱
PythonExecute 工具实现了一个安全的 Python 代码执行环境,这是一个值得学习的安全实现。
class PythonExecute(BaseTool): name: str = "python_execute" async def execute(self, code: str, timeout: int = 5) -> Dict: """安全执行 Python 代码""" with multiprocessing.Manager() as manager: result = manager.dict({"observation": "", "success": False}) # 安全的全局环境 safe_globals = {"__builtins__": __builtins__.__dict__.copy()} # 使用独立进程执行代码 proc = multiprocessing.Process( target=self._run_code, args=(code, result, safe_globals) ) proc.start() proc.join(timeout) # 超时处理 if proc.is_alive(): proc.terminate() proc.join(1) return { "observation": f"Execution timeout after {timeout} seconds", "success": False, } return dict(result)
这段代码展示了几个安全编程的最佳实践:
- 使用独立进程隔离代码执行
- 实现了超时机制防止无限循环
- 截获和处理所有异常
- 重定向标准输出以捕获打印内容
4.2、状态管理与上下文切换机制
BaseAgent 实现了一个优雅的状态管理和上下文切换机制:
@asynccontextmanager
async def state_context(self, new_state: AgentState): """Context manager for safe agent state transitions.""" if not isinstance(new_state, AgentState): raise ValueError(f"Invalid state: {new_state}") previous_state = self.state self.state = new_state try: yield except Exception as e: self.state = AgentState.ERROR # Transition to ERROR on failure raise e finally: self.state = previous_state # Revert to previous state
这个上下文管理器确保了状态转换的安全性和可靠性,即使在异常情况下也能正确恢复状态,是一个值得学习的设计模式。
4.3、工具结果统一表示与组合
OpenManus 设计了 ToolResult 类来统一表示工具执行结果,并支持结果组合
class ToolResult(BaseModel): """Represents the result of a tool execution.""" output: Any = Field(default=None) error: Optional[str] = Field(default=None) base64_image: Optional[str] = Field(default=None) system: Optional[str] = Field(default=None) def __add__(self, other: "ToolResult"): """组合两个工具结果""" def combine_fields(field: Optional[str], other_field: Optional[str], concatenate: bool = True): if field and other_field: if concatenate: return field + other_field raise ValueError("Cannot combine tool results") return field or other_field return ToolResult( output=combine_fields(self.output, other.output), error=combine_fields(self.error, other.error), base64_image=combine_fields(self.base64_image, other.base64_image, False), system=combine_fields(self.system, other.system), )
这种设计使得工具结果处理更加统一和灵活,特别是在需要组合多个工具结果或处理异常情况时。