动手实践OpenHands系列学习笔记8:后端服务开发
笔记8:后端服务开发
一、引言
后端服务是AI代理系统的技术基础,负责处理业务逻辑、状态管理和外部集成。本笔记将探讨API设计与服务架构理论,分析OpenHands的后端设计特点,并通过实践构建一个模拟OpenHands核心功能的后端服务模块。
二、API设计与服务架构理论
2.1 API设计原则
- RESTful设计: 资源化URL设计、HTTP方法语义
- GraphQL: 声明式数据查询、减少请求次数
- API版本控制: 路径版本、Header版本、参数版本
- 状态码使用: 合理使用HTTP状态码表达结果
- 幂等性: 相同请求多次执行结果一致
- 安全性: 身份验证、授权和输入验证
2.2 服务架构模式
-
单体架构:
- 所有功能集中在一个服务中
- 简单部署,适合小型应用
-
微服务架构:
- 服务按业务能力分解
- 独立部署和扩展
- 技术栈多样化
-
无服务器架构:
- 事件驱动
- 自动扩展
- 按使用付费
-
分层架构:
- 表示层、业务逻辑层、数据访问层
- 关注点分离
- 便于维护
2.3 后端技术栈选择
技术类别 | 常见选择 | 特点 |
---|---|---|
语言 | Node.js, Python, Go | 各有性能和生态系统优势 |
Web框架 | Express, FastAPI, Gin | 提供路由、中间件等核心功能 |
数据库 | PostgreSQL, MongoDB, Redis | 关系型vs文档型vs键值型 |
消息队列 | RabbitMQ, Kafka | 异步任务处理,系统解耦 |
身份验证 | JWT, OAuth2, API密钥 | 不同场景下的认证需求 |
2.4 后端架构关键技术
- 依赖注入: 组件解耦,便于测试
- 中间件: 请求处理管道,横切关注点
- ORM/ODM: 对象关系映射,简化数据库操作
- 缓存策略: 提高性能,减轻数据库负担
- 日志与监控: 系统运行状态可观测性
- 异常处理: 优雅处理错误,提高系统稳定性
三、OpenHands后端服务分析
从README_CN.md中,可以推断OpenHands后端服务具有以下特点:
3.1 OpenHands后端架构特点
-
基于Docker容器:
- 使用容器化部署简化环境一致性
- 主容器和沙箱容器分离架构
-
状态持久化机制:
- 对话历史保存
- 配置存储
- 工作区状态维护
-
多模式支持:
- Web界面模式
- 无头模式(Headless)
- CLI模式
- GitHub Action集成
-
LLM集成:
- 支持多种LLM提供商
- API密钥管理
- 模型调用优化
3.2 OpenHands API端点推测
基于功能需求,OpenHands可能包含以下API端点:
/api/chat
: 处理用户与代理的对话/api/execute
: 执行命令/api/files
: 文件操作接口/api/config
: 配置管理/api/tools
: 工具注册与调用/api/models
: LLM模型管理/api/sessions
: 会话状态管理/api/projects
: 项目管理(对于GitHub Action集成)
3.3 数据流分析
在OpenHands中,后端服务的主要数据流可能如下:
- 用户输入 → 前端界面 → 后端API → LLM解析
- LLM决策 → 工具调用决定 → 工具执行
- 工具执行结果 → LLM分析 → 响应生成 → 前端展示
- 状态变更 → 持久化存储 → 会话恢复
四、实践项目:实现OpenHands核心后端服务
4.1 项目结构设计
src/
├── config/ # 配置文件
│ ├── config.js # 主配置
│ └── logger.js # 日志配置
├── controllers/ # 控制器
│ ├── chatController.js # 对话控制器
│ ├── toolController.js # 工具控制器
│ └── sessionController.js # 会话控制器
├── middlewares/ # 中间件
│ ├── auth.js # 认证中间件
│ ├── errorHandler.js # 错误处理
│ └── requestLogger.js # 请求日志
├── models/ # 数据模型
│ ├── session.js # 会话模型
│ └── toolExecution.js # 工具执行记录模型
├── routes/ # 路由定义
│ ├── chatRoutes.js # 对话路由
│ ├── toolRoutes.js # 工具路由
│ └── configRoutes.js # 配置路由
├── services/ # 业务服务
│ ├── agentService.js # 代理核心服务
│ ├── llmService.js # LLM集成服务
│ ├── toolService.js # 工具管理服务
│ └── dockerService.js # Docker集成服务
├── utils/ # 工具函数
│ ├── promiseUtils.js # Promise工具
│ ├── validationUtils.js # 验证工具
│ └── fsUtils.js # 文件系统工具
└── app.js # 应用入口
4.2 实现核心服务层
agentService.js - 代理核心服务:
// src/services/agentService.js
const EventEmitter = require('events');
const { v4: uuidv4 } = require('uuid');
const logger = require('../config/logger');
const llmService = require('./llmService');
const toolService = require('./toolService');
const dockerService = require('./dockerService');
const { SessionManager } = require('./sessionManager');class AgentService extends EventEmitter {constructor() {super();this.sessions = new SessionManager();this.activeAgents = new Map();this.defaultSystemPrompt = `你是OpenHands AI开发助手,一个专业的软件开发代理。你能够编写代码、运行命令、分析错误并提供解决方案。`;}/*** 初始化代理服务*/async initialize() {try {// 初始化LLM服务await llmService.initialize();// 初始化工具服务await toolService.initialize();// 初始化Docker服务await dockerService.initialize();// 加载持久化的会话await this.sessions.loadSessions();logger.info('Agent service initialized successfully');return true;} catch (error) {logger.error('Failed to initialize agent service:', error);throw error;}}/*** 创建新的代理会话* @param {Object} options - 会话选项* @returns {String} - 会话ID*/async createSession(options = {}) {const sessionId = options.sessionId || uuidv4();// 创建新会话const session = this.sessions.createSession(sessionId, {createdAt: new Date(),lastActive: new Date(),options: {llmProvider: options.llmProvider || llmService.getDefaultProvider(),systemPrompt: options.systemPrompt || this.defaultSystemPrompt,...options},history: [],state: 'idle'});// 如果配置了沙箱环境,初始化沙箱if (options.useSandbox !== false) {try {const sandboxId = await dockerService.createSandbox(sessionId);session.sandboxId = sandboxId;logger.info(`Created sandbox ${sandboxId} for session ${sessionId}`);} catch (error) {logger.error(`Failed to create sandbox for session ${sessionId}:`, error);// 继续,即使没有沙箱也允许会话创建}}logger.info(`Created new agent session: ${sessionId}`);this.emit('sessionCreated', sessionId);return sessionId;}/*** 处理用户输入* @param {String} sessionId - 会话ID* @param {String} input - 用户输入* @param {Object} options - 处理选项* @returns {Promise<Object>} - 处理结果*/async processInput(sessionId, input, options = {}) {// 验证会话存在if (!this.sessions.hasSession(sessionId)) {throw new Error(`Session not found: ${sessionId}`);}const session = this.sessions.getSession(sessionId);session.lastActive = new Date();// 更新会话状态this.sessions.updateSessionState(sessionId, 'thinking');this.emit('stateChanged', { sessionId, state: 'thinking' });try {// 添加用户消息到历史this.sessions.addMessageToHistory(sessionId, {role: 'user',content: input,timestamp: new Date()});// 准备上下文const context = this._prepareContext(session);// 调用LLMlogger.info(`Processing input for session ${sessionId}: "${input.substring(0, 50)}..."`);const llmResponse = await llmService.completeChat(session.options.llmProvider,input,context,session.options);// 处理工具调用if (llmResponse.toolCalls && llmResponse.toolCalls.length > 0) {return await this._handleToolCalls(sessionId, llmResponse);}// 处理标准响应const response = {type: 'text',content: llmResponse.content,sessionId};// 添加助手响应到历史this.sessions.addMessageToHistory(sessionId, {role: 'assistant',content: llmResponse.content,timestamp: new Date()});// 更新会话状态this.sessions.updateSessionState(sessionId, 'idle');this.emit('stateChanged', { sessionId, state: 'idle' });return response;} catch (error) {// 处理错误logger.error(`Error processing input for session ${sessionId}:`, error);// 更新会话状态this.sessions.updateSessionState(sessionId, 'error');this.emit('stateChanged', { sessionId, state: 'error' });throw error;}}/*** 处理工具调用* @private* @param {String} sessionId - 会话ID* @param {Object} llmResponse - LLM响应* @returns {Promise<Object>} - 处理结果*/async _handleToolCalls(sessionId, llmResponse) {const session = this.sessions.getSession(sessionId);const toolCalls = llmResponse.toolCalls;const toolResults = [];// 更新会话状态this.sessions.updateSessionState(sessionId, 'executing');this.emit('stateChanged', { sessionId, state: 'executing' });// 添加思考消息到历史const assistantMessage = {role: 'assistant',content: llmResponse.content || '我将执行一些操作...',timestamp: new Date(),toolCalls: toolCalls.map(call => ({tool: call.name,args: call.arguments,status: 'pending'}))};this.sessions.addMessageToHistory(sessionId, assistantMessage);// 执行工具调用for (let i = 0; i < toolCalls.length; i++) {const call = toolCalls[i];const toolCallIndex = i;try {logger.info(`Executing tool ${call.name} for session ${sessionId}`);// 更新工具状态为执行中this._updateToolCallStatus(sessionId, assistantMessage, toolCallIndex, 'running');// 执行工具const result = await toolService.executeTool(call.name,call.arguments,{ sessionId, sandboxId: session.sandboxId });// 更新工具状态为成功this._updateToolCallStatus(sessionId,assistantMessage,toolCallIndex,'success',{ result });toolResults.push({tool: call.name,status: 'success',result});} catch (error) {// 更新工具状态为失败this._updateToolCallStatus(sessionId,assistantMessage,toolCallIndex,'error',{ error: error.message });toolResults.push({tool: call.name,status: 'error',error: error.message});logger.error(`Error executing tool ${call.name}:`, error);}}// 如果有工具执行失败,可以询问LLM如何处理if (toolResults.some(result => result.status === 'error')) {// 添加工具执行结果到历史this.sessions.addMessageToHistory(sessionId, {role: 'system',content: `工具执行结果: ${JSON.stringify(toolResults)}`,timestamp: new Date()});// 请求LLM处理错误const errorInput = `有些工具执行失败了,请处理这些错误: ${JSON.stringify(toolResults)}`;const context = this._prepareContext(session);const errorHandlingResponse = await llmService.completeChat(session.options.llmProvider,errorInput,context,session.options);// 添加错误处理响应到历史this.sessions.addMessageToHistory(sessionId, {role: 'assistant',content: errorHandlingResponse.content,timestamp: new Date()});}// 返回结果const response = {type: 'tool_execution',content: llmResponse.content,toolResults,sessionId};// 更新会话状态this.sessions.updateSessionState(sessionId, 'idle');this.emit('stateChanged', { sessionId, state: 'idle' });return response;}/*** 更新工具调用状态* @private* @param {String} sessionId - 会话ID* @param {Object} message - 消息对象* @param {Number} index - 工具调用索引* @param {String} status - 新状态* @param {Object} data - 附加数据*/_updateToolCallStatus(sessionId, message, index, status, data = {}) {// 获取历史中最后一条助手消息const history = this.sessions.getSessionHistory(sessionId);const messageIndex = history.findIndex(msg =>msg.role === 'assistant' &&msg.toolCalls &&msg.timestamp.getTime() === message.timestamp.getTime());if (messageIndex !== -1 && history[messageIndex].toolCalls[index]) {history[messageIndex].toolCalls[index] = {...history[messageIndex].toolCalls[index],status,...data};// 触发工具状态更新事件this.emit('toolStatusChanged', {sessionId,messageIndex,toolCallIndex: index,status,data});}}/*** 准备LLM上下文* @private* @param {Object} session - 会话对象* @returns {Object} - 上下文对象*/_prepareContext(session) {return {systemPrompt: session.options.systemPrompt,history: session.history,availableTools: toolService.getAvailableTools()};}/*** 执行命令* @param {String} sessionId - 会话ID* @param {String} command - 命令字符串* @returns {Promise<Object>} - 执行结果*/async executeCommand(sessionId, command) {// 验证会话存在if (!this.sessions.hasSession(sessionId)) {throw new Error(`Session not found: ${sessionId}`);}const session = this.sessions.getSession(sessionId);// 更新会话状态this.sessions.updateSessionState(sessionId, 'executing');this.emit('stateChanged', { sessionId, state: 'executing' });try {// 添加命令执行记录const commandMessage = {role: 'user',content: `执行命令: ${command}`,timestamp: new Date()};this.sessions.addMessageToHistory(sessionId, commandMessage);// 执行命令const result = await toolService.executeTool('execute_command',{ command },{ sessionId, sandboxId: session.sandboxId });// 添加执行结果到历史const resultMessage = {role: 'system',content: `命令执行结果:\n${result.stdout || ''}\n${result.stderr || ''}`,timestamp: new Date(),command,result};this.sessions.addMessageToHistory(sessionId, resultMessage);// 更新会话状态this.sessions.updateSessionState(sessionId, 'idle');this.emit('stateChanged', { sessionId, state: 'idle' });return {type: 'command_execution',command,result,sessionId};} catch (error) {// 处理错误logger.error(`Error executing command for session ${sessionId}:`, error);// 添加错误到历史this.sessions.addMessageToHistory(sessionId, {role: 'system',content: `命令执行错误: ${error.message}`,timestamp: new Date(),command,error: error.message});// 更新会话状态this.sessions.updateSessionState(sessionId, 'error');this.emit('stateChanged', { sessionId, state: 'error' });throw error;}}/*** 获取会话历史* @param {String} sessionId - 会话ID* @returns {Array} - 会话历史消息*/getSessionHistory(sessionId) {return this.sessions.getSessionHistory(sessionId);}/*** 获取会话状态* @param {String} sessionId - 会话ID* @returns {Object} - 会话状态*/getSessionState(sessionId) {if (!this.sessions.hasSession(sessionId)) {throw new Error(`Session not found: ${sessionId}`);}const session = this.sessions.getSession(sessionId);return {id: sessionId,state: session.state,createdAt: session.createdAt,lastActive: session.lastActive,historyLength: session.history.length,sandboxId: session.sandboxId};}/*** 列出所有会话* @returns {Array} - 会话列表*/listSessions() {return this.sessions.listSessions().map(session => ({id: session.id,state: session.state,createdAt: session.createdAt,lastActive: session.lastActive,historyLength: session.history.length}));}/*** 删除会话* @param {String} sessionId - 会话ID* @returns {Boolean} - 是否成功删除*/async deleteSession(sessionId) {if (!this.sessions.hasSession(sessionId)) {return false;}const session = this.sessions.getSession(sessionId);// 如果有沙箱,删除沙箱if (session.sandboxId) {try {await dockerService.removeSandbox(session.sandboxId);logger.info(`Removed sandbox ${session.sandboxId} for session ${sessionId}`);} catch (error) {logger.error(`Error removing sandbox for session ${sessionId}:`, error);}}// 删除会话const result = this.sessions.deleteSession(sessionId);if (result) {this.emit('sessionDeleted', sessionId);logger.info(`Deleted session: ${sessionId}`);}return result;}/*** 关闭服务*/async shutdown() {try {// 保存所有会话await this.sessions.saveSessions();// 关闭所有沙箱const allSessions = this.sessions.listSessions();for (const session of allSessions) {if (session.sandboxId) {try {await dockerService.removeSandbox(session.sandboxId);logger.info(`Removed sandbox ${session.sandboxId} during shutdown`);} catch (error) {logger.error(`Error removing sandbox ${session.sandboxId} during shutdown:`, error);}}}// 关闭其他服务await llmService.shutdown();await toolService.shutdown();await dockerService.shutdown();logger.info('Agent service shut down successfully');} catch (error) {logger.error('Error during agent service shutdown:', error);throw error;}}
}// 创建单例实例
const agentService = new AgentService();module.exports = agentService;
sessionManager.js - 会话管理服务:
// src/services/sessionManager.js
const fs = require('fs').promises;
const path = require('path');
const logger = require('../config/logger');
const { ensureDirectoryExists } = require('../utils/fsUtils');class SessionManager {constructor(options = {}) {this.sessionsDir = options.sessionsDir || path.join(process.cwd(), '.openhands', 'sessions');this.sessions = new Map();this.maxHistory = options.maxHistory || 200;this.saveInterval = options.saveInterval || 60000; // 1分钟this.saveTimer = null;}/*** 初始化会话管理器*/async initialize() {// 确保会话目录存在await ensureDirectoryExists(this.sessionsDir);// 加载会话await this.loadSessions();// 设置定期保存this.saveTimer = setInterval(() => {this.saveSessions().catch(err => {logger.error('Error auto-saving sessions:', err);});}, this.saveInterval);}/*** 加载所有会话*/async loadSessions() {try {await ensureDirectoryExists(this.sessionsDir);const files = await fs.readdir(this.sessionsDir);const sessionFiles = files.filter(file => file.endsWith('.json'));for (const file of sessionFiles) {try {const sessionId = path.basename(file, '.json');const sessionData = await fs.readFile(path.join(this.sessionsDir, file), 'utf8');const parsedSession = JSON.parse(sessionData);// 确保日期字段正确解析if (parsedSession.createdAt) {parsedSession.createdAt = new Date(parsedSession.createdAt);}if (parsedSession.lastActive) {parsedSession.lastActive = new Date(parsedSession.lastActive);}// 处理历史记录中的日期if (parsedSession.history && Array.isArray(parsedSession.history)) {parsedSession.history = parsedSession.history.map(msg => ({...msg,timestamp: msg.timestamp ? new Date(msg.timestamp) : new Date()}));}this.sessions.set(sessionId, parsedSession);logger.debug(`Loaded session: ${sessionId}`);} catch (error) {logger.error(`Error loading session from ${file}:`, error);}}logger.info(`Loaded ${this.sessions.size} sessions`);} catch (error) {logger.error('Error loading sessions:', error);throw error;}}/*** 保存所有会话*/async saveSessions() {try {await ensureDirectoryExists(this.sessionsDir);const savePromises = [];for (const [sessionId, session] of this.sessions.entries()) {const sessionFile = path.join(this.sessionsDir, `${sessionId}.json`);const sessionJson = JSON.stringify(session, null, 2);savePromises.push(fs.writeFile(sessionFile, sessionJson, 'utf8'));}await Promise.all(savePromises);logger.debug(`Saved ${this.sessions.size} sessions`);return true;} catch (error) {logger.error('Error saving sessions:', error);throw error;}}/*** 创建新会话* @param {String} sessionId - 会话ID* @param {Object} sessionData - 会话数据* @returns {Object} - 创建的会话*/createSession(sessionId, sessionData = {}) {const session = {id: sessionId,createdAt: new Date(),lastActive: new Date(),state: 'idle',history: [],...sessionData};this.sessions.set(sessionId, session);return session;}/*** 获取会话* @param {String} sessionId - 会话ID* @returns {Object|null} - 会话对象或null*/getSession(sessionId) {return this.sessions.get(sessionId) || null;}/*** 检查会话是否存在* @param {String} sessionId - 会话ID* @returns {Boolean} - 会话是否存在*/hasSession(sessionId) {return this.sessions.has(sessionId);}/*** 更新会话状态* @param {String} sessionId - 会话ID* @param {String} state - 新状态* @returns {Boolean} - 是否成功更新*/updateSessionState(sessionId, state) {const session = this.sessions.get(sessionId);if (!session) return false;session.state = state;session.lastActive = new Date();return true;}/*** 添加消息到会话历史* @param {String} sessionId - 会话ID* @param {Object} message - 消息对象* @returns {Boolean} - 是否成功添加*/addMessageToHistory(sessionId, message) {const session = this.sessions.get(sessionId);if (!session) return false;// 确保timestamp是Date对象if (!message.timestamp) {message.timestamp = new Date();}// 添加消息到历史session.history.push(message);// 限制历史长度if (session.history.length > this.maxHistory) {session.history = session.history.slice(session.history.length - this.maxHistory);}session.lastActive = new Date();return true;}/*** 获取会话历史* @param {String} sessionId - 会话ID* @returns {Array} - 会话历史*/getSessionHistory(sessionId) {const session = this.sessions.get(sessionId);return session ? session.history : [];}/*** 列出所有会话* @returns {Array} - 会话列表*/listSessions() {return Array.from(this.sessions.values());}/*** 删除会话* @param {String} sessionId - 会话ID* @returns {Boolean} - 是否成功删除*/async deleteSession(sessionId) {if (!this.sessions.has(sessionId)) return false;// 从内存中删除this.sessions.delete(sessionId);// 从磁盘中删除try {const sessionFile = path.join(this.sessionsDir, `${sessionId}.json`);await fs.unlink(sessionFile);} catch (error) {logger.error(`Error deleting session file for ${sessionId}:`, error);// 即使文件删除失败,我们仍认为会话删除成功}return true;}/*** 清理过期会话* @param {Number} maxAge - 最大会话年龄(毫秒)* @returns {Number} - 清理的会话数量*/async cleanupSessions(maxAge = 7 * 24 * 60 * 60 * 1000) { // 默认7天const now = new Date();let cleanedCount = 0;for (const [sessionId, session] of this.sessions.entries()) {const age = now - session.lastActive;if (age > maxAge) {await this.deleteSession(sessionId);cleanedCount++;}}return cleanedCount;}/*** 关闭会话管理器*/async shutdown() {// 取消自动保存if (this.saveTimer) {clearInterval(this.saveTimer);this.saveTimer = null;}// 保存所有会话await this.saveSessions();}
}module.exports = { SessionManager };
dockerService.js - Docker集成服务:
// src/services/dockerService.js
const { execFile } = require('child_process');
const { promisify } = require('util');
const logger = require('../config/logger');
const config = require('../config/config');const execFileAsync = promisify(execFile);class DockerService {constructor() {this.sandboxImage = config.sandbox.image || 'docker.all-hands.dev/all-hands-ai/runtime:0.47-nikolaik';this.activeSandboxes = new Map();}/*** 初始化Docker服务*/async initialize() {try {// 检查Docker是否可用await this._checkDockerAvailability();// 拉取沙箱镜像await this._pullSandboxImage();logger.info('Docker service initialized successfully');return true;} catch (error) {logger.error('Failed to initialize Docker service:', error);throw error;}}/*** 检查Docker是否可用* @private*/async _checkDockerAvailability() {try {const { stdout } = await execFileAsync('docker', ['version', '--format', '{{.Server.Version}}']);logger.info(`Docker available, version: ${stdout.trim()}`);return true;} catch (error) {logger.error('Docker not available:', error);throw new Error('Docker is not available. Please make sure Docker is installed and running.');}}/*** 拉取沙箱镜像* @private*/async _pullSandboxImage() {try {logger.info(`Pulling sandbox image: ${this.sandboxImage}`);const { stdout, stderr } = await execFileAsync('docker', ['pull', this.sandboxImage]);if (stderr && !stderr.includes('up to date')) {logger.warn(`Warning while pulling image: ${stderr}`);}logger.info(`Sandbox image pulled: ${this.sandboxImage}`);return true;} catch (error) {logger.error(`Failed to pull sandbox image ${this.sandboxImage}:`, error);throw error;}}/*** 创建沙箱容器* @param {String} sessionId - 关联的会话ID* @returns {String} - 沙箱容器ID*/async createSandbox(sessionId) {try {const containerName = `openhands-sandbox-${sessionId.substring(0, 8)}`;// 创建工作目录const workdirVolume = `openhands-workdir-${sessionId.substring(0, 8)}`;// 创建数据卷await execFileAsync('docker', ['volume', 'create', workdirVolume]);// 启动容器const args = ['run','-d', // 后台运行'--name', containerName,// 资源限制'--cpus', config.sandbox.cpus || '1','--memory', config.sandbox.memory || '1g','--pids-limit', config.sandbox.pidsLimit || '100',// 安全限制'--security-opt', 'no-new-privileges','--cap-drop', 'ALL','--cap-add', 'CHOWN','--cap-add', 'DAC_OVERRIDE','--cap-add', 'SETGID','--cap-add', 'SETUID',// 挂载工作目录'-v', `${workdirVolume}:/workspace`,// 网络设置'--network', config.sandbox.network || 'bridge',// 环境变量'-e', 'OPENHANDS_SANDBOX=true','-e', `SESSION_ID=${sessionId}`,// 镜像this.sandboxImage];// 执行docker run命令const { stdout } = await execFileAsync('docker', args);const containerId = stdout.trim();// 记录沙箱信息this.activeSandboxes.set(containerId, {sessionId,containerName,workdirVolume,createdAt: new Date()});logger.info(`Created sandbox container ${containerId} for session ${sessionId}`);return containerId;} catch (error) {logger.error(`Failed to create sandbox for session ${sessionId}:`, error);throw error;}}/*** 执行沙箱内命令* @param {String} sandboxId - 沙箱容器ID* @param {String} command - 要执行的命令* @param {Object} options - 执行选项* @returns {Object} - 执行结果*/async execInSandbox(sandboxId, command, options = {}) {try {if (!this.activeSandboxes.has(sandboxId)) {throw new Error(`Sandbox ${sandboxId} not found`);}const { workingDir = '/workspace', timeout = 30000 } = options;// 构建exec命令const args = ['exec',// 设置工作目录'-w', workingDir,// 设置超时'--timeout', Math.floor(timeout / 1000).toString(),// 容器IDsandboxId,// 执行命令'bash', '-c', command];logger.debug(`Executing command in sandbox ${sandboxId}: ${command}`);// 执行命令const { stdout, stderr } = await execFileAsync('docker', args);return {stdout,stderr,exitCode: 0};} catch (error) {// 处理执行错误logger.error(`Error executing command in sandbox ${sandboxId}:`, error);return {stdout: error.stdout || '',stderr: error.stderr || error.message,exitCode: error.code || 1};}}/*** 复制文件到沙箱* @param {String} sandboxId - 沙箱容器ID* @param {String} localPath - 本地路径* @param {String} containerPath - 容器内路径* @returns {Boolean} - 是否成功*/async copyToSandbox(sandboxId, localPath, containerPath) {try {if (!this.activeSandboxes.has(sandboxId)) {throw new Error(`Sandbox ${sandboxId} not found`);}// docker cp命令await execFileAsync('docker', ['cp', localPath, `${sandboxId}:${containerPath}`]);logger.debug(`Copied ${localPath} to sandbox ${sandboxId}:${containerPath}`);return true;} catch (error) {logger.error(`Error copying to sandbox ${sandboxId}:`, error);throw error;}}/*** 从沙箱复制文件* @param {String} sandboxId - 沙箱容器ID* @param {String} containerPath - 容器内路径* @param {String} localPath - 本地路径* @returns {Boolean} - 是否成功*/async copyFromSandbox(sandboxId, containerPath, localPath) {try {if (!this.activeSandboxes.has(sandboxId)) {throw new Error(`Sandbox ${sandboxId} not found`);}// docker cp命令await execFileAsync('docker', ['cp', `${sandboxId}:${containerPath}`, localPath]);logger.debug(`Copied from sandbox ${sandboxId}:${containerPath} to ${localPath}`);return true;} catch (error) {logger.error(`Error copying from sandbox ${sandboxId}:`, error);throw error;}}/*** 删除沙箱* @param {String} sandboxId - 沙箱容器ID* @returns {Boolean} - 是否成功*/async removeSandbox(sandboxId) {try {if (!this.activeSandboxes.has(sandboxId)) {logger.warn(`Sandbox ${sandboxId} not found, might have been removed already`);return false;}const sandboxInfo = this.activeSandboxes.get(sandboxId);// 停止并删除容器try {await execFileAsync('docker', ['stop', sandboxId]);await execFileAsync('docker', ['rm', sandboxId]);logger.debug(`Removed container ${sandboxId}`);} catch (error) {logger.error(`Error removing container ${sandboxId}:`, error);}// 删除卷try {await execFileAsync('docker', ['volume', 'rm', sandboxInfo.workdirVolume]);logger.debug(`Removed volume ${sandboxInfo.workdirVolume}`);} catch (error) {logger.error(`Error removing volume ${sandboxInfo.workdirVolume}:`, error);}// 从记录中删除this.activeSandboxes.delete(sandboxId);logger.info(`Removed sandbox ${sandboxId}`);return true;} catch (error) {logger.error(`Error removing sandbox ${sandboxId}:`, error);throw error;}}/*** 关闭所有沙箱*/async shutdown() {logger.info(`Shutting down Docker service, removing ${this.activeSandboxes.size} sandboxes...`);const removePromises = [];for (const [sandboxId] of this.activeSandboxes) {removePromises.push(this.removeSandbox(sandboxId).catch(error => {logger.error(`Error removing sandbox ${sandboxId} during shutdown:`, error);}));}await Promise.all(removePromises);logger.info('Docker service shutdown complete');}
}// 创建单例实例
const dockerService = new DockerService();module.exports = dockerService;
llmService.js - LLM集成服务:
// src/services/llmService.js
const axios = require('axios');
const { v4: uuidv4 } = require('uuid');
const logger = require('../config/logger');
const config = require('../config/config');class LLMService {constructor() {this.providers = new Map();this.defaultProvider = 'anthropic';}/*** 初始化LLM服务*/async initialize() {try {// 注册默认提供商this.registerProvider('anthropic', {name: 'Anthropic',baseUrl: 'https://api.anthropic.com/v1',defaultModel: 'claude-sonnet-4-20250514',apiKeyName: 'ANTHROPIC_API_KEY',apiKey: process.env.ANTHROPIC_API_KEY || config.llm?.anthropic?.apiKey || '',requestHandler: this._anthropicRequestHandler.bind(this)});this.registerProvider('openai', {name: 'OpenAI',baseUrl: 'https://api.openai.com/v1',defaultModel: 'gpt-4o',apiKeyName: 'OPENAI_API_KEY',apiKey: process.env.OPENAI_API_KEY || config.llm?.openai?.apiKey || '',requestHandler: this._openaiRequestHandler.bind(this)});logger.info('LLM service initialized successfully');return true;} catch (error) {logger.error('Failed to initialize LLM service:', error);throw error;}}/*** 注册LLM提供商* @param {String} id - 提供商ID* @param {Object} config - 提供商配置*/registerProvider(id, config) {this.providers.set(id, config);logger.debug(`Registered LLM provider: ${id}`);}/*** 获取默认提供商* @returns {String} - 默认提供商ID*/getDefaultProvider() {return this.defaultProvider;}/*** 设置默认提供商* @param {String} providerId - 提供商ID*/setDefaultProvider(providerId) {if (!this.providers.has(providerId)) {throw new Error(`LLM provider ${providerId} not registered`);}this.defaultProvider = providerId;}/*** 设置提供商API密钥* @param {String} providerId - 提供商ID* @param {String} apiKey - API密钥*/setApiKey(providerId, apiKey) {if (!this.providers.has(providerId)) {throw new Error(`LLM provider ${providerId} not registered`);}const provider = this.providers.get(providerId);provider.apiKey = apiKey;}/*** 获取可用提供商列表* @returns {Array} - 提供商列表*/getAvailableProviders() {return Array.from(this.providers.entries()).map(([id, config]) => ({id,name: config.name,defaultModel: config.defaultModel,hasApiKey: !!config.apiKey}));}/*** 执行聊天完成* @param {String} providerId - 提供商ID* @param {String} input - 用户输入* @param {Object} context - 上下文* @param {Object} options - 选项* @returns {Promise<Object>} - 响应对象*/async completeChat(providerId, input, context, options = {}) {const provider = this.providers.get(providerId || this.defaultProvider);if (!provider) {throw new Error(`LLM provider ${providerId} not found`);}if (!provider.apiKey) {throw new Error(`API key not set for provider ${providerId}`);}// 准备请求参数const requestParams = {model: options.model || provider.defaultModel,input,context,options};try {// 调用提供商特定的处理器logger.debug(`Calling LLM provider ${providerId} for completion`);const response = await provider.requestHandler(requestParams);return response;} catch (error) {logger.error(`Error calling LLM provider ${providerId}:`, error);throw error;}}/*** Anthropic请求处理器* @private* @param {Object} params - 请求参数* @returns {Promise<Object>} - 响应对象*/async _anthropicRequestHandler(params) {const provider = this.providers.get('anthropic');// 构建消息历史const messages = [];// 添加系统消息if (params.context.systemPrompt) {messages.push({role: 'system',content: params.context.systemPrompt});}// 添加历史消息if (params.context.history && Array.isArray(params.context.history)) {for (const message of params.context.history) {if (message.role === 'user' || message.role === 'assistant') {messages.push({role: message.role,content: message.content});}}}// 添加工具定义const tools = [];if (params.context.availableTools && Array.isArray(params.context.availableTools)) {for (const tool of params.context.availableTools) {tools.push({name: tool.name,description: tool.description,input_schema: tool.schema});}}// 添加当前用户消息messages.push({role: 'user',content: params.input});// 构建请求体const requestBody = {model: params.model,messages,max_tokens: params.options.maxTokens || 2048,temperature: params.options.temperature || 0.7,top_p: params.options.topP || 0.95};// 如果有工具定义,添加到请求if (tools.length > 0) {requestBody.tools = tools;}// 发送请求try {const response = await axios({method: 'post',url: `${provider.baseUrl}/messages`,headers: {'Content-Type': 'application/json','x-api-key': provider.apiKey,'anthropic-version': '2023-06-01'},data: requestBody});// 解析响应const content = response.data.content?.[0]?.text || '';// 检查是否有工具调用const toolCalls = [];if (response.data.content?.[0]?.type === 'tool_use') {const toolUse = response.data.content[0].tool_use;toolCalls.push({name: toolUse.name,arguments: toolUse.input});}return {content,toolCalls,model: response.data.model,usage: response.data.usage};} catch (error) {if (error.response) {logger.error('Anthropic API error:', error.response.data);throw new Error(`Anthropic API error: ${error.response.data.error?.message || error.message}`);}throw error;}}/*** OpenAI请求处理器* @private* @param {Object} params - 请求参数* @returns {Promise<Object>} - 响应对象*/async _openaiRequestHandler(params) {const provider = this.providers.get('openai');// 构建消息历史const messages = [];// 添加系统消息if (params.context.systemPrompt) {messages.push({role: 'system',content: params.context.systemPrompt});}// 添加历史消息if (params.context.history && Array.isArray(params.context.history)) {for (const message of params.context.history) {if (message.role === 'user' || message.role === 'assistant' || message.role === 'system') {messages.push({role: message.role,content: message.content});}}}// 添加当前用户消息messages.push({role: 'user',content: params.input});// 构建工具定义const tools = [];if (params.context.availableTools && Array.isArray(params.context.availableTools)) {for (const tool of params.context.availableTools) {tools.push({type: 'function',function: {name: tool.name,description: tool.description,parameters: tool.schema}});}}// 构建请求体const requestBody = {model: params.model,messages,max_tokens: params.options.maxTokens || 2048,temperature: params.options.temperature || 0.7,top_p: params.options.topP || 0.95};// 如果有工具定义,添加到请求if (tools.length > 0) {requestBody.tools = tools;requestBody.tool_choice = 'auto';}// 发送请求try {const response = await axios({method: 'post',url: `${provider.baseUrl}/chat/completions`,headers: {'Content-Type': 'application/json','Authorization': `Bearer ${provider.apiKey}`},data: requestBody});// 解析响应const content = response.data.choices[0]?.message?.content || '';// 检查是否有工具调用const toolCalls = [];if (response.data.choices[0]?.message?.tool_calls) {for (const toolCall of response.data.choices[0].message.tool_calls) {if (toolCall.type === 'function') {toolCalls.push({name: toolCall.function.name,arguments: JSON.parse(toolCall.function.arguments)});}}}return {content,toolCalls,model: response.data.model,usage: response.data.usage};} catch (error) {if (error.response) {logger.error('OpenAI API error:', error.response.data);throw new Error(`OpenAI API error: ${error.response.data.error?.message || error.message}`);}throw error;}}/*** 关闭LLM服务*/async shutdown() {logger.info('LLM service shutdown');}
}// 创建单例实例
const llmService = new LLMService();module.exports = llmService;
toolService.js - 工具管理服务:
// src/services/toolService.js
const EventEmitter = require('events');
const fs = require('fs').promises;
const path = require('path');
const logger = require('../config/logger');
const { ensureDirectoryExists } = require('../utils/fsUtils');
const dockerService = require('./dockerService');class ToolService extends EventEmitter {constructor() {super();this.toolsDir = path.join(process.cwd(), 'src', 'tools');this.tools = new Map();this.disabledTools = new Set();}/*** 初始化工具服务*/async initialize() {try {// 确保工具目录存在await ensureDirectoryExists(this.toolsDir);// 加载工具await this._loadBuiltInTools();logger.info(`Tool service initialized with ${this.tools.size} tools`);return true;} catch (error) {logger.error('Failed to initialize tool service:', error);throw error;}}/*** 加载内置工具* @private*/async _loadBuiltInTools() {// 文件系统工具this.registerTool('file_system', {name: 'file_system',description: '执行文件系统操作,如读取、写入、列出文件',schema: {type: 'object',properties: {operation: {type: 'string',enum: ['read', 'write', 'list', 'exists', 'delete'],description: '要执行的文件操作'},path: {type: 'string',description: '文件或目录路径'},content: {type: 'string',description: '写入操作的文件内容'}},required: ['operation', 'path']},execute: this._executeFileSystemTool.bind(this)});// 命令执行工具this.registerTool('execute_command', {name: 'execute_command',description: '在沙箱环境中执行Shell命令',schema: {type: 'object',properties: {command: {type: 'string',description: '要执行的命令'},workingDir: {type: 'string',description: '命令执行的工作目录'}},required: ['command']},execute: this._executeCommandTool.bind(this)});// HTTP请求工具this.registerTool('http_request', {name: 'http_request',description: '发送HTTP请求',schema: {type: 'object',properties: {url: {type: 'string',description: '请求URL'},method: {type: 'string',enum: ['GET', 'POST', 'PUT', 'DELETE'],default: 'GET',description: 'HTTP方法'},headers: {type: 'object',description: 'HTTP请求头'},data: {type: 'object',description: '请求体数据'}},required: ['url']},execute: this._executeHttpRequestTool.bind(this)});// 工作区工具this.registerTool('workspace', {name: 'workspace',description: '管理项目工作区',schema: {type: 'object',properties: {operation: {type: 'string',enum: ['init', 'status', 'save', 'restore'],description: '工作区操作'},name: {type: 'string',description: '工作区名称'}},required: ['operation']},execute: this._executeWorkspaceTool.bind(this)});}/*** 注册工具* @param {String} id - 工具ID* @param {Object} tool - 工具配置*/registerTool(id, tool) {this.tools.set(id, tool);this.emit('tool_registered', id);logger.debug(`Registered tool: ${id}`);}/*** 获取可用工具列表* @returns {Array} - 工具列表*/getAvailableTools() {return Array.from(this.tools.entries()).filter(([id]) => !this.disabledTools.has(id)).map(([id, tool]) => ({name: tool.name,description: tool.description,schema: tool.schema}));}/*** 获取工具定义* @param {String} toolId - 工具ID* @returns {Object|null} - 工具定义*/getToolDefinition(toolId) {const tool = this.tools.get(toolId);if (!tool) return null;return {name: tool.name,description: tool.description,schema: tool.schema};}/*** 执行工具* @param {String} toolId - 工具ID* @param {Object} args - 工具参数* @param {Object} context - 执行上下文* @returns {Promise<Object>} - 执行结果*/async executeTool(toolId, args, context = {}) {const tool = this.tools.get(toolId);if (!tool) {throw new Error(`Tool ${toolId} not found`);}if (this.disabledTools.has(toolId)) {throw new Error(`Tool ${toolId} is disabled`);}// 记录工具调用this.emit('tool_execution_start', { toolId, args, context });logger.debug(`Executing tool ${toolId} with args:`, args);try {// 执行工具const result = await tool.execute(args, context);// 记录执行结果this.emit('tool_execution_complete', { toolId, args, context, result });return result;} catch (error) {// 记录执行错误this.emit('tool_execution_error', { toolId, args, context, error });logger.error(`Error executing tool ${toolId}:`, error);throw error;}}/*** 执行文件系统工具* @private* @param {Object} args - 工具参数* @param {Object} context - 执行上下文* @returns {Promise<Object>} - 执行结果*/async _executeFileSystemTool(args, context) {const { operation, path: filePath, content } = args;const { sandboxId } = context;if (!sandboxId) {throw new Error('Sandbox ID is required for file system operations');}switch (operation) {case 'read':// 在沙箱中读取文件const readResult = await dockerService.execInSandbox(sandboxId,`cat "${filePath}"`,{ workingDir: '/workspace' });if (readResult.exitCode !== 0) {throw new Error(`Failed to read file: ${readResult.stderr}`);}return { content: readResult.stdout };case 'write':// 将内容写入临时文件const tempFile = `/tmp/file-${Date.now()}`;await fs.writeFile(tempFile, content || '');// 将临时文件复制到沙箱await dockerService.copyToSandbox(sandboxId, tempFile, filePath);// 清理临时文件await fs.unlink(tempFile);return { success: true };case 'list':// 在沙箱中列出目录const listResult = await dockerService.execInSandbox(sandboxId,`ls -la "${filePath}"`,{ workingDir: '/workspace' });if (listResult.exitCode !== 0) {throw new Error(`Failed to list directory: ${listResult.stderr}`);}return { output: listResult.stdout };case 'exists':// 检查文件是否存在const existsResult = await dockerService.execInSandbox(sandboxId,`test -e "${filePath}" && echo "exists" || echo "not exists"`,{ workingDir: '/workspace' });return { exists: existsResult.stdout.trim() === 'exists' };case 'delete':// 删除文件const deleteResult = await dockerService.execInSandbox(sandboxId,`rm -rf "${filePath}"`,{ workingDir: '/workspace' });if (deleteResult.exitCode !== 0) {throw new Error(`Failed to delete file: ${deleteResult.stderr}`);}return { success: true };default:throw new Error(`Unsupported file system operation: ${operation}`);}}/*** 执行命令工具* @private* @param {Object} args - 工具参数* @param {Object} context - 执行上下文* @returns {Promise<Object>} - 执行结果*/async _executeCommandTool(args, context) {const { command, workingDir = '/workspace' } = args;const { sandboxId } = context;if (!sandboxId) {throw new Error('Sandbox ID is required for command execution');}// 在沙箱中执行命令const result = await dockerService.execInSandbox(sandboxId,command,{ workingDir });return {stdout: result.stdout,stderr: result.stderr,exitCode: result.exitCode};}/*** 执行HTTP请求工具* @private* @param {Object} args - 工具参数* @param {Object} context - 执行上下文* @returns {Promise<Object>} - 执行结果*/async _executeHttpRequestTool(args, context) {const { url, method = 'GET', headers = {}, data } = args;// 使用axios发送请求const axios = require('axios');try {const response = await axios({method,url,headers,data,timeout: 10000,maxRedirects: 3});return {status: response.status,statusText: response.statusText,headers: response.headers,data: response.data};} catch (error) {if (error.response) {return {error: true,status: error.response.status,statusText: error.response.statusText,headers: error.response.headers,data: error.response.data};}throw error;}}/*** 执行工作区工具* @private* @param {Object} args - 工具参数* @param {Object} context - 执行上下文* @returns {Promise<Object>} - 执行结果*/async _executeWorkspaceTool(args, context) {const { operation, name = 'default' } = args;const { sandboxId, sessionId } = context;if (!sandboxId) {throw new Error('Sandbox ID is required for workspace operations');}const workspacesDir = path.join(process.cwd(), '.openhands', 'workspaces');await ensureDirectoryExists(workspacesDir);switch (operation) {case 'init':// 初始化工作区await dockerService.execInSandbox(sandboxId,'mkdir -p /workspace',{});return { success: true };case 'status':// 获取工作区状态const statusResult = await dockerService.execInSandbox(sandboxId,'find /workspace -type f | wc -l',{});return {fileCount: parseInt(statusResult.stdout.trim()) || 0,sandbox: sandboxId,session: sessionId};case 'save':// 保存工作区const workspaceFile = path.join(workspacesDir, `${name}.tar`);// 在沙箱中创建归档await dockerService.execInSandbox(sandboxId,'tar -cf /tmp/workspace.tar -C /workspace .',{});// 从沙箱复制归档await dockerService.copyFromSandbox(sandboxId,'/tmp/workspace.tar',workspaceFile);return {success: true,name,path: workspaceFile};case 'restore':// 恢复工作区const restoreFile = path.join(workspacesDir, `${name}.tar`);// 检查归档是否存在try {await fs.access(restoreFile);} catch (error) {throw new Error(`Workspace ${name} does not exist`);}// 复制归档到沙箱await dockerService.copyToSandbox(sandboxId,restoreFile,'/tmp/workspace.tar');// 在沙箱中解压归档await dockerService.execInSandbox(sandboxId,'tar -xf /tmp/workspace.tar -C /workspace',{});return {success: true,name};default:throw new Error(`Unsupported workspace operation: ${operation}`);}}/*** 启用工具* @param {String} toolId - 工具ID* @returns {Boolean} - 是否成功*/enableTool(toolId) {if (!this.tools.has(toolId)) {return false;}this.disabledTools.delete(toolId);this.emit('tool_enabled', toolId);logger.debug(`Enabled tool: ${toolId}`);return true;}/*** 禁用工具* @param {String} toolId - 工具ID* @returns {Boolean} - 是否成功*/disableTool(toolId) {if (!this.tools.has(toolId)) {return false;}this.disabledTools.add(toolId);this.emit('tool_disabled', toolId);logger.debug(`Disabled tool: ${toolId}`);return true;}/*** 关闭工具服务*/async shutdown() {logger.info('Tool service shutdown');}
}// 创建单例实例
const toolService = new ToolService();module.exports = toolService;
4.3 实现控制器层
chatController.js - 对话控制器:
// src/controllers/chatController.js
const { validationResult } = require('express-validator');
const logger = require('../config/logger');
const agentService = require('../services/agentService');/*** 发送消息* @param {Object} req - 请求对象* @param {Object} res - 响应对象* @param {Function} next - 下一个中间件*/
exports.sendMessage = async (req, res, next) => {try {const errors = validationResult(req);if (!errors.isEmpty()) {return res.status(400).json({ errors: errors.array() });}const { message, conversationId } = req.body;// 如果没有会话ID,创建新会话let sessionId = conversationId;if (!sessionId) {sessionId = await agentService.createSession();}// 处理用户输入const response = await agentService.processInput(sessionId, message);res.json({success: true,sessionId,content: response.content,type: response.type,toolCalls: response.toolCalls,toolResults: response.toolResults});} catch (error) {logger.error('Error in sendMessage controller:', error);next(error);}
};/*** 执行命令* @param {Object} req - 请求对象* @param {Object} res - 响应对象* @param {Function} next - 下一个中间件*/
exports.executeCommand = async (req, res, next) => {try {const errors = validationResult(req);if (!errors.isEmpty()) {return res.status(400).json({ errors: errors.array() });}const { command, conversationId } = req.body;if (!conversationId) {return res.status(400).json({success: false,error: 'Conversation ID is required'});}// 执行命令const result = await agentService.executeCommand(conversationId, command);res.json({success: true,sessionId: conversationId,command,output: result.result.stdout,error: result.result.stderr,exitCode: result.result.exitCode});} catch (error) {logger.error('Error in executeCommand controller:', error);next(error);}
};/*** 获取对话历史* @param {Object} req - 请求对象* @param {Object} res - 响应对象* @param {Function} next - 下一个中间件*/
exports.getHistory = async (req, res, next) => {try {const { conversationId } = req.params;if (!conversationId) {return res.status(400).json({success: false,error: 'Conversation ID is required'});}// 获取会话历史const history = agentService.getSessionHistory(conversationId);res.json({success: true,sessionId: conversationId,history});} catch (error) {logger.error('Error in getHistory controller:', error);next(error);}
};/*** 创建新对话* @param {Object} req - 请求对象* @param {Object} res - 响应对象* @param {Function} next - 下一个中间件*/
exports.createConversation = async (req, res, next) => {try {const options = req.body || {};// 创建新会话const sessionId = await agentService.createSession(options);res.json({success: true,sessionId});} catch (error) {logger.error('Error in createConversation controller:', error);next(error);}
};/*** 删除对话* @param {Object} req - 请求对象* @param {Object} res - 响应对象* @param {Function} next - 下一个中间件*/
exports.deleteConversation = async (req, res, next) => {try {const { conversationId } = req.params;if (!conversationId) {return res.status(400).json({success: false,error: 'Conversation ID is required'});}// 删除会话const result = await agentService.deleteSession(conversationId);res.json({success: result});} catch (error) {logger.error('Error in deleteConversation controller:', error);next(error);}
};/*** 获取对话列表* @param {Object} req - 请求对象* @param {Object} res - 响应对象* @param {Function} next - 下一个中间件*/
exports.listConversations = async (req, res, next) => {try {// 获取会话列表const sessions = agentService.listSessions();res.json({success: true,sessions});} catch (error) {logger.error('Error in listConversations controller:', error);next(error);}
};
toolController.js - 工具控制器:
// src/controllers/toolController.js
const { validationResult } = require('express-validator');
const logger = require('../config/logger');
const toolService = require('../services/toolService');/*** 获取可用工具列表* @param {Object} req - 请求对象* @param {Object} res - 响应对象* @param {Function} next - 下一个中间件*/
exports.getAvailableTools = async (req, res, next) => {try {// 获取可用工具列表const tools = toolService.getAvailableTools();res.json({success: true,tools});} catch (error) {logger.error('Error in getAvailableTools controller:', error);next(error);}
};/*** 获取工具定义* @param {Object} req - 请求对象* @param {Object} res - 响应对象* @param {Function} next - 下一个中间件*/
exports.getToolDefinition = async (req, res, next) => {try {const { toolId } = req.params;if (!toolId) {return res.status(400).json({success: false,error: 'Tool ID is required'});}// 获取工具定义const definition = toolService.getToolDefinition(toolId);if (!definition) {return res.status(404).json({success: false,error: 'Tool not found'});}res.json({success: true,tool: definition});} catch (error) {logger.error('Error in getToolDefinition controller:', error);next(error);}
};/*** 执行工具* @param {Object} req - 请求对象* @param {Object} res - 响应对象* @param {Function} next - 下一个中间件*/
exports.executeTool = async (req, res, next) => {try {const errors = validationResult(req);if (!errors.isEmpty()) {return res.status(400).json({ errors: errors.array() });}const { toolId } = req.params;const { args, context } = req.body;if (!toolId) {return res.status(400).json({success: false,error: 'Tool ID is required'});}// 执行工具const result = await toolService.executeTool(toolId, args, context);res.json({success: true,toolId,result});} catch (error) {logger.error('Error in executeTool controller:', error);next(error);}
};/*** 启用工具* @param {Object} req - 请求对象* @param {Object} res - 响应对象* @param {Function} next - 下一个中间件*/
exports.enableTool = async (req, res, next) => {try {const { toolId } = req.params;if (!toolId) {return res.status(400).json({success: false,error: 'Tool ID is required'});}// 启用工具const result = toolService.enableTool(toolId);res.json({success: result});} catch (error) {logger.error('Error in enableTool controller:', error);next(error);}
};/*** 禁用工具* @param {Object} req - 请求对象* @param {Object} res - 响应对象* @param {Function} next - 下一个中间件*/
exports.disableTool = async (req, res, next) => {try {const { toolId } = req.params;if (!toolId) {return res.status(400).json({success: false,error: 'Tool ID is required'});}// 禁用工具const result = toolService.disableTool(toolId);res.json({success: result});} catch (error) {logger.error('Error in disableTool controller:', error);next(error);}
};
configController.js - 配置控制器:
// src/controllers/configController.js
const { validationResult } = require('express-validator');
const logger = require('../config/logger');
const llmService = require('../services/llmService');
const config = require('../config/config');
const fs = require('fs').promises;
const path = require('path');/*** 获取配置信息* @param {Object} req - 请求对象* @param {Object} res - 响应对象* @param {Function} next - 下一个中间件*/
exports.getConfig = async (req, res, next) => {try {// 获取可用LLM提供商const providers = llmService.getAvailableProviders();const defaultProvider = llmService.getDefaultProvider();// 过滤敏感信息const safeConfig = {version: config.version,environment: config.environment,sandbox: {enabled: config.sandbox.enabled,image: config.sandbox.image},llm: {providers,defaultProvider}};res.json({success: true,config: safeConfig});} catch (error) {logger.error('Error in getConfig controller:', error);next(error);}
};/*** 更新API密钥* @param {Object} req - 请求对象* @param {Object} res - 响应对象* @param {Function} next - 下一个中间件*/
exports.updateApiKey = async (req, res, next) => {try {const errors = validationResult(req);if (!errors.isEmpty()) {return res.status(400).json({ errors: errors.array() });}const { provider, apiKey } = req.body;if (!provider) {return res.status(400).json({success: false,error: 'Provider is required'});}// 更新API密钥llmService.setApiKey(provider, apiKey);// 保存到配置文件const configDir = path.join(process.cwd(), '.openhands', 'config');await fs.mkdir(configDir, { recursive: true });const keysFile = path.join(configDir, 'api-keys.json');let keys = {};try {const keysData = await fs.readFile(keysFile, 'utf8');keys = JSON.parse(keysData);} catch (error) {// 文件不存在或解析错误,使用空对象}keys[provider] = apiKey;await fs.writeFile(keysFile, JSON.stringify(keys, null, 2), 'utf8');res.json({success: true,provider});} catch (error) {logger.error('Error in updateApiKey controller:', error);next(error);}
};/*** 设置默认LLM提供商* @param {Object} req - 请求对象* @param {Object} res - 响应对象* @param {Function} next - 下一个中间件*/
exports.setDefaultProvider = async (req, res, next) => {try {const errors = validationResult(req);if (!errors.isEmpty()) {return res.status(400).json({ errors: errors.array() });}const { provider } = req.body;if (!provider) {return res.status(400).json({success: false,error: 'Provider is required'});}// 设置默认提供商llmService.setDefaultProvider(provider);res.json({success: true,defaultProvider: provider});} catch (error) {logger.error('Error in setDefaultProvider controller:', error);next(error);}
};
4.4 实现路由层
chatRoutes.js - 对话路由:
// src/routes/chatRoutes.js
const express = require('express');
const { body, param } = require('express-validator');
const chatController = require('../controllers/chatController');const router = express.Router();/*** @route POST /api/chat* @desc 发送消息* @access Public*/
router.post('/',[body('message').notEmpty().withMessage('Message is required')],chatController.sendMessage
);/*** @route POST /api/chat/execute* @desc 执行命令* @access Public*/
router.post('/execute',[body('command').notEmpty().withMessage('Command is required'),body('conversationId').notEmpty().withMessage('Conversation ID is required')],chatController.executeCommand
);/*** @route GET /api/chat/conversations* @desc 获取对话列表* @access Public*/
router.get('/conversations',chatController.listConversations
);/*** @route POST /api/chat/conversations* @desc 创建新对话* @access Public*/
router.post('/conversations',chatController.createConversation
);/*** @route GET /api/chat/conversations/:conversationId* @desc 获取对话历史* @access Public*/
router.get('/conversations/:conversationId',[param('conversationId').notEmpty().withMessage('Conversation ID is required')],chatController.getHistory
);/*** @route DELETE /api/chat/conversations/:conversationId* @desc 删除对话* @access Public*/
router.delete('/conversations/:conversationId',[param('conversationId').notEmpty().withMessage('Conversation ID is required')],chatController.deleteConversation
);module.exports = router;
toolRoutes.js - 工具路由:
// src/routes/toolRoutes.js
const express = require('express');
const { body, param } = require('express-validator');
const toolController = require('../controllers/toolController');const router = express.Router();/*** @route GET /api/tools* @desc 获取可用工具列表* @access Public*/
router.get('/',toolController.getAvailableTools
);/*** @route GET /api/tools/:toolId* @desc 获取工具定义* @access Public*/
router.get('/:toolId',[param('toolId').notEmpty().withMessage('Tool ID is required')],toolController.getToolDefinition
);/*** @route POST /api/tools/:toolId/execute* @desc 执行工具* @access Public*/
router.post('/:toolId/execute',[param('toolId').notEmpty().withMessage('Tool ID is required'),body('args').notEmpty().withMessage('Arguments are required')],toolController.executeTool
);/*** @route PUT /api/tools/:toolId/enable* @desc 启用工具* @access Public*/
router.put('/:toolId/enable',[param('toolId').notEmpty().withMessage('Tool ID is required')],toolController.enableTool
);/*** @route PUT /api/tools/:toolId/disable* @desc 禁用工具* @access Public*/
router.put('/:toolId/disable',[param('toolId').notEmpty().withMessage('Tool ID is required')],toolController.disableTool
);module.exports = router;
configRoutes.js - 配置路由:
// src/routes/configRoutes.js
const express = require('express');
const { body } = require('express-validator');
const configController = require('../controllers/configController');const router = express.Router();/*** @route GET /api/config* @desc 获取配置信息* @access Public*/
router.get('/',configController.getConfig
);/*** @route POST /api/config/api-key* @desc 更新API密钥* @access Public*/
router.post('/api-key',[body('provider').notEmpty().withMessage('Provider is required'),body('apiKey').notEmpty().withMessage('API key is required')],configController.updateApiKey
);/*** @route POST /api/config/default-provider* @desc 设置默认LLM提供商* @access Public*/
router.post('/default-provider',[body('provider').notEmpty().withMessage('Provider is required')],configController.setDefaultProvider
);module.exports = router;
4.5 实现应用入口
app.js - 应用入口:
// src/app.js
const express = require('express');
const cors = require('cors');
const helmet = require('helmet');
const compression = require('compression');
const morgan = require('morgan');
const path = require('path');
const fs = require('fs').promises;
const config = require('./config/config');
const logger = require('./config/logger');
const errorHandler = require('./middlewares/errorHandler');
const requestLogger = require('./middlewares/requestLogger');
const agentService = require('./services/agentService');
const chatRoutes = require('./routes/chatRoutes');
const toolRoutes = require('./routes/toolRoutes');
const configRoutes = require('./routes/configRoutes');// 创建Express应用
const app = express();// 初始化中间件
app.use(helmet());
app.use(compression());
app.use(cors());
app.use(express.json({ limit: '10mb' }));
app.use(express.urlencoded({ extended: true, limit: '10mb' }));// 请求日志
if (config.environment === 'production') {// 在生产环境中,将日志写入文件const logDir = path.join(process.cwd(), 'logs');fs.mkdir(logDir, { recursive: true }).then(() => {const accessLogStream = fs.createWriteStream(path.join(logDir, 'access.log'),{ flags: 'a' });app.use(morgan('combined', { stream: accessLogStream }));}).catch(err => {logger.error('Failed to create logs directory:', err);app.use(morgan('combined'));});
} else {// 在开发环境中,使用简洁的日志格式app.use(morgan('dev'));
}// 请求ID和日志中间件
app.use(requestLogger);// 挂载API路由
app.use('/api/chat', chatRoutes);
app.use('/api/tools', toolRoutes);
app.use('/api/config', configRoutes);// 静态文件服务
app.use(express.static(path.join(__dirname, 'public')));// 错误处理
app.use(errorHandler);// 启动服务器
const port = config.port || 3000;let server;// 优雅启动
async function startServer() {try {// 初始化代理服务await agentService.initialize();// 启动HTTP服务器server = app.listen(port, () => {logger.info(`Server running on port ${port}`);});// 处理未捕获的异常process.on('uncaughtException', (error) => {logger.error('Uncaught Exception:', error);});// 处理未处理的Promise拒绝process.on('unhandledRejection', (reason, promise) => {logger.error('Unhandled Promise Rejection:', reason);});// 处理进程终止信号process.on('SIGTERM', gracefulShutdown);process.on('SIGINT', gracefulShutdown);} catch (error) {logger.error('Failed to start server:', error);process.exit(1);}
}// 优雅关闭
async function gracefulShutdown() {logger.info('Shutting down server...');if (server) {server.close(async () => {logger.info('HTTP server closed');// 关闭代理服务try {await agentService.shutdown();logger.info('Agent service shut down successfully');} catch (error) {logger.error('Error during agent service shutdown:', error);}process.exit(0);});// 如果在10秒内没有关闭,强制退出setTimeout(() => {logger.error('Forcing server shutdown after timeout');process.exit(1);}, 10000);} else {process.exit(0);}
}// 启动服务器
startServer().catch((error) => {logger.error('Failed to start server:', error);process.exit(1);
});module.exports = app;
config.js - 配置文件:
// src/config/config.js
const path = require('path');
const dotenv = require('dotenv');// 加载环境变量
dotenv.config();const config = {// 应用信息name: 'OpenHands Backend',version: '1.0.0',description: 'OpenHands AI开发助手后端服务',// 环境配置environment: process.env.NODE_ENV || 'development',port: parseInt(process.env.PORT || '3000', 10),// 日志配置logs: {level: process.env.LOG_LEVEL || 'info',file: process.env.LOG_FILE || path.join(process.cwd(), 'logs', 'app.log')},// 沙箱配置sandbox: {enabled: process.env.SANDBOX_ENABLED !== 'false',image: process.env.SANDBOX_IMAGE || 'docker.all-hands.dev/all-hands-ai/runtime:0.47-nikolaik',cpus: process.env.SANDBOX_CPUS || '1',memory: process.env.SANDBOX_MEMORY || '1g',pidsLimit: parseInt(process.env.SANDBOX_PIDS_LIMIT || '100', 10),network: process.env.SANDBOX_NETWORK || 'bridge'},// LLM配置llm: {anthropic: {apiKey: process.env.ANTHROPIC_API_KEY || '',model: process.env.ANTHROPIC_MODEL || 'claude-sonnet-4-20250514'},openai: {apiKey: process.env.OPENAI_API_KEY || '',model: process.env.OPENAI_MODEL || 'gpt-4o'}},// 会话配置session: {maxHistory: parseInt(process.env.MAX_HISTORY || '200', 10),saveInterval: parseInt(process.env.SAVE_INTERVAL || '60000', 10)},// 工具配置tools: {disabledTools: (process.env.DISABLED_TOOLS || '').split(',').filter(Boolean)}
};module.exports = config;
logger.js - 日志配置:
// src/config/logger.js
const winston = require('winston');
const path = require('path');
const config = require('./config');// 确保日志目录存在
const fs = require('fs');
const logDir = path.dirname(config.logs.file);
if (!fs.existsSync(logDir)) {fs.mkdirSync(logDir, { recursive: true });
}// 创建日志格式
const logFormat = winston.format.combine(winston.format.timestamp({format: 'YYYY-MM-DD HH:mm:ss'}),winston.format.errors({ stack: true }),winston.format.printf(info => {let logMsg = `${info.timestamp} [${info.level.toUpperCase()}]: ${info.message}`;if (info.stack) {logMsg += `\n${info.stack}`;}return logMsg;})
);// 创建Logger实例
const logger = winston.createLogger({level: config.logs.level,format: logFormat,transports: [// 控制台输出new winston.transports.Console({format: winston.format.combine(winston.format.colorize(),logFormat)}),// 文件输出new winston.transports.File({filename: config.logs.file,maxsize: 5242880, // 5MBmaxFiles: 5})]
});module.exports = logger;
errorHandler.js - 错误处理中间件:
// src/middlewares/errorHandler.js
const logger = require('../config/logger');/*** 错误处理中间件* @param {Error} err - 错误对象* @param {Object} req - 请求对象* @param {Object} res - 响应对象* @param {Function} next - 下一个中间件*/
const errorHandler = (err, req, res, next) => {// 记录错误logger.error(`Error on ${req.method} ${req.path}: ${err.message}`, {requestId: req.requestId,error: err});// 判断错误类型if (err.name === 'ValidationError') {return res.status(400).json({success: false,error: 'Validation Error',details: err.details || err.message});}if (err.name === 'UnauthorizedError') {return res.status(401).json({success: false,error: 'Unauthorized',details: err.message});}if (err.name === 'ForbiddenError') {return res.status(403).json({success: false,error: 'Forbidden',details: err.message});}if (err.name === 'NotFoundError') {return res.status(404).json({success: false,error: 'Not Found',details: err.message});}// 默认为服务器错误return res.status(500).json({success: false,error: 'Internal Server Error',details: process.env.NODE_ENV === 'production' ? 'Something went wrong' : err.message});
};module.exports = errorHandler;
requestLogger.js - 请求日志中间件:
// src/middlewares/requestLogger.js
const { v4: uuidv4 } = require('uuid');
const logger = require('../config/logger');/*** 请求日志中间件* @param {Object} req - 请求对象* @param {Object} res - 响应对象* @param {Function} next - 下一个中间件*/
const requestLogger = (req, res, next) => {// 生成请求IDreq.requestId = req.headers['x-request-id'] || uuidv4();res.setHeader('X-Request-ID', req.requestId);// 记录请求开始logger.debug(`Request started: ${req.method} ${req.path}`, {requestId: req.requestId,method: req.method,path: req.originalUrl || req.url,ip: req.ip,userAgent: req.get('user-agent')});// 记录请求结束const start = Date.now();res.on('finish', () => {const duration = Date.now() - start;const level = res.statusCode >= 400 ? 'warn' : 'debug';logger[level](`Request completed: ${req.method} ${req.path} ${res.statusCode} (${duration}ms)`, {requestId: req.requestId,method: req.method,path: req.originalUrl || req.url,statusCode: res.statusCode,duration});});next();
};module.exports = requestLogger;
五、总结与思考
5.1 后端服务架构关键点
本笔记详细实现了一个模拟OpenHands功能的后端服务架构,主要关键点包括:
- 分层架构设计:采用控制器-服务-模型的分层架构,实现关注点分离
- 事件驱动模式:使用EventEmitter实现服务间的松耦合通信
- 沙箱隔离:通过Docker容器提供安全的命令执行环境
- 插件式工具系统:实现可扩展的工具注册和执行机制
- 会话状态管理:完善的会话创建、持久化和恢复功能
- 错误处理策略:统一的错误处理中间件,提高系统稳定性
- 配置管理:灵活的配置系统,支持环境变量覆盖
5.2 与OpenHands实际架构的比较
虽然我们的实现是基于对OpenHands功能的推断,但与实际系统可能存在以下差异:
- 规模复杂度:实际系统可能有更复杂的功能和更大的代码库
- 性能优化:商业系统可能有更多性能优化措施,如缓存、队列等
- 安全措施:更严格的权限控制和安全隔离机制
- 分布式架构:可能采用微服务架构,而非我们实现的单体应用
- 更完善的插件系统:可能有更灵活的插件机制和更丰富的工具集
5.3 实现过程中的技术决策
在开发过程中,我们做出了一些值得关注的技术决策:
- 选择Node.js:因其非阻塞I/O和事件驱动特性,适合构建响应式的AI代理系统
- Docker集成:提供安全的代码执行环境,是AI代理系统的关键安全措施
- 会话持久化:使用文件系统而非数据库,简化部署,适合中小规模应用
- 模块化设计:通过良好的接口定义,使各组件可独立测试和替换
- 统一错误处理:集中式的错误处理策略,提供一致的错误响应
六、下一步学习方向
本笔记实现了OpenHands核心后端功能,但以下方向值得进一步探索:
- 数据库集成:使用MongoDB或PostgreSQL替代文件系统存储,提升扩展性
- 身份认证与授权:实现JWT认证和基于角色的访问控制
- WebSocket支持:添加实时通信功能,提升用户体验
- 任务队列:使用Redis或RabbitMQ实现异步任务处理
- 服务监控:集成Prometheus和Grafana实现系统监控
- API文档:使用Swagger自动生成API文档
- 测试覆盖:编写单元测试和集成测试,提高代码质量
- 容器化部署:创建Docker Compose配置,简化部署过程
- CI/CD流水线:实现自动测试和部署流程
七、参考资源
- Express.js官方文档
- Node.js最佳实践
- RESTful API设计指南
- Docker SDK for Node.js
- OpenAPI规范
- 微服务设计模式
- 安全编码指南
- Anthropic Claude API文档
- Winston日志框架
- Express错误处理