【Dify(v1.x) 核心源码深入解析】mcp 模块
重磅推荐专栏:
《大模型AIGC》
《课程大纲》
《知识星球》
本专栏致力于探索和讨论当今最前沿的技术趋势和应用领域,包括但不限于ChatGPT和Stable Diffusion等。我们将深入研究大型模型的开发和应用,以及与之相关的人工智能生成内容(AIGC)技术。通过深入的技术解析和实践经验分享,旨在帮助读者更好地理解和应用这些领域的最新进展
一、MCP 模块整体架构
1.1 什么是 MCP?
MCP(Model Context Protocol)是 Dify 平台中用于连接大模型与工具服务的标准化协议。它解决了三个核心问题:
- 工具发现:Agent 如何动态发现可用工具
- 安全调用:如何安全执行工具操作
- 结果整合:如何将工具结果整合到模型响应中
1.2 MCP 核心组件
通过代码,我们可以梳理出 MCP 的核心架构:
二、Agent 工作全流程解析
2.1 完整调用时序
当用户提问时,Agent 通过 MCP 调用工具插件的完整过程:
2.2 各阶段 Prompt 构造详解
- 阶段1:工具发现(list_tools)
Agent 构造的提示词:
你是一个智能助手,可以调用外部工具解决问题。
当前可用工具列表:
{{TOOL_LIST}}用户问题:{{USER_QUERY}}
请分析是否需要调用工具?需要调用哪个工具?
- 阶段2:工具参数解析
Agent 构造的提示词:
你将调用工具:{{TOOL_NAME}}
工具描述:{{TOOL_DESCRIPTION}}
工具参数要求:
{{PARAMETER_SCHEMA}}用户输入:{{USER_INPUT}}
请提取符合工具要求的参数JSON
- 阶段3:结果整合
Agent 构造的提示词:
工具调用结果:
{{TOOL_RESULTS}}原始用户问题:{{USER_QUERY}}
请根据以上信息生成最终回答
三、核心代码深度解析
3.1 MCP 客户端初始化
class MCPClient:def __init__(self,server_url: str, # MCP 服务端URLprovider_id: str, # 工具提供商标识tenant_id: str, # 租户IDauthed: bool = True, # 是否需要认证authorization_code: Optional[str] = None, # OAuth授权码):# 初始化基础信息self.provider_id = provider_idself.tenant_id = tenant_idself.server_url = server_url# 认证相关初始化self.authed = authedself.authorization_code = authorization_codeif authed:from core.mcp.auth.auth_provider import OAuthClientProvider# 创建OAuth提供者self.provider = OAuthClientProvider(self.provider_id, self.tenant_id)# 获取访问令牌self.token = self.provider.tokens()# 会话管理初始化self._session = Noneself.exit_stack = ExitStack() # 上下文管理栈
3.2 工具调用核心流程
def invoke_tool(self, tool_name: str, tool_args: dict):# 检查会话是否初始化if not self._initialized or not self._session:raise ValueError("Session not initialized.")# 通过ClientSession执行工具调用return self._session.call_tool(tool_name, tool_args)# ClientSession中的实际调用
class ClientSession(BaseSession):def call_tool(self,name: str,arguments: dict[str, Any] | None = None,) -> types.CallToolResult:# 构造工具调用请求request = types.CallToolRequest(method="tools/call",params=types.CallToolRequestParams(name=name, arguments=arguments))# 发送请求并获取响应return self.send_request(types.ClientRequest(request),types.CallToolResult)
3.3 服务端工具执行
class MCPServerStreamableHTTPRequestHandler:def invoke_tool(self):# 获取当前用户if not self.end_user:raise ValueError("User not found")# 解析工具调用请求request = cast(types.CallToolRequest, self.request.root)args = request.params.arguments# 根据不同应用模式构造参数if self.app.mode == AppMode.WORKFLOW:args = {"inputs": args}elif self.app.mode == AppMode.COMPLETION:args = {"query": "", "inputs": args}else:args = {"query": args["query"], "inputs": {k:v for k,v in args.items() if k != "query"}}# 调用AppGenerateService执行实际工具操作response = AppGenerateService.generate(self.app,self.end_user,args,InvokeFrom.SERVICE_API,streaming=self.app.mode == AppMode.AGENT_CHAT.value,)# 处理流式响应answer = ""if isinstance(response, RateLimitGenerator):for item in response.generator:# 解析流式响应中的有效数据if item.startswith("data: "):try:parsed = json.loads(item[6:].strip())if parsed.get("event") == "agent_thought":answer += parsed.get("thought", "")# 构造工具调用响应return types.CallToolResult(content=[types.TextContent(text=answer, type="text")])
四、认证与安全机制
4.1 OAuth 2.0 认证流程
4.2 认证代码实现
class OAuthClientProvider:def __init__(self, provider_id: str, tenant_id: str, for_list: bool = False):# 获取MCP提供者配置if for_list:self.mcp_provider = MCPToolManageService.get_mcp_provider_by_provider_id(provider_id, tenant_id)else:self.mcp_provider = MCPToolManageService.get_mcp_provider_by_server_identifier(provider_id, tenant_id)def tokens(self) -> Optional[OAuthTokens]:# 从数据库获取存储的tokencredentials = self.mcp_provider.decrypted_credentialsif not credentials:return Nonereturn OAuthTokens(access_token=credentials.get("access_token", ""),token_type=credentials.get("token_type", "Bearer"),expires_in=int(credentials.get("expires_in", "3600") or 3600),refresh_token=credentials.get("refresh_token", ""),)def save_tokens(self, tokens: OAuthTokens) -> None:# 将新token存储到数据库token_dict = tokens.model_dump()MCPToolManageService.update_mcp_provider_credentials(self.mcp_provider, token_dict, authed=True)
五、传输层实现剖析
5.1 双工通信模型
MCP 使用生产者-消费者模式实现双工通信:
5.2 Streamable HTTP 传输实现
@contextmanager
def streamablehttp_client(url, headers=None, timeout=30, sse_read_timeout=300):transport = StreamableHTTPTransport(url, headers, timeout, sse_read_timeout)# 创建通信队列server_to_client = queue.Queue() # 服务端->客户端client_to_server = queue.Queue() # 客户端->服务端with ThreadPoolExecutor(max_workers=2) as executor:with create_ssrf_proxy_mcp_http_client() as client:# 启动写线程executor.submit(transport.post_writer,client,client_to_server, # 从该队列读取请求server_to_client, # 向该队列写入响应)# 启动读线程executor.submit(transport.handle_get_stream,client,server_to_client)yield (server_to_client, # 客户端读取此队列client_to_server, # 客户端写入此队列transport.get_session_id)
六、性能优化策略
6.1 连接复用机制
通过会话管理实现连接复用:
class MCPClient:def __enter__(self):self._initialize() # 初始化连接self._initialized = Truereturn selfdef __exit__(self, exc_type, exc_value, traceback):self.cleanup() # 清理资源def _initialize(self):"""根据URL自动选择最优传输协议"""connection_methods = {"mcp": streamablehttp_client, # HTTP长连接"sse": sse_client, # SSE流}# 自动协议选择逻辑parsed_url = urlparse(self.server_url)path_segment = parsed_url.path.rstrip("/").split("/")[-1] or ""try:# 首先尝试根据路径选择协议client_factory = connection_methods[path_segment]self.connect_server(client_factory, path_segment)except KeyError:# 自动回退机制try:self.connect_server(sse_client, "sse")except MCPConnectionError:self.connect_server(streamablehttp_client, "mcp")
6.2 负载均衡策略
七、应用实践案例
7.1 天气预报工具集成
工具定义:
{"name": "weather_forecast","description": "获取指定位置的天气预报","inputSchema": {"type": "object","properties": {"location": {"type": "string","description": "城市名称"},"date": {"type": "string","format": "date","description": "日期 (YYYY-MM-DD)"}},"required": ["location"]}
}
调用过程:
- Agent 识别用户需要天气预报
- 通过 MCPClient.list_tools() 获取工具
- 构造参数调用 invoke_tool(“weather_forecast”, {“location”: “北京”})
- 服务端调用天气 API 获取数据
- 返回结构化结果:
{
“content”: [{
“type”: “text”,
“text”: “北京今日天气:晴,25℃”
}]
}
八、总结与展望
8.1 MCP 核心价值
• 标准化协议:统一工具接入规范
• 安全隔离:OAuth2.0 + SSRF 防护
• 灵活扩展:支持多种传输协议
• 性能优化:连接复用 + 流式处理
8.2 未来演进方向
- 协议扩展:支持 gRPC 等高性能协议
- 联邦学习:跨工具的知识共享
- 智能路由:基于工具性能的自动路由
- 缓存优化:工具结果的智能缓存
MCP 模块作为 Dify 平台的工具连接层,通过标准化的协议设计和灵活的实现架构,使 Agent 能够安全高效地集成和使用各种外部工具,极大地扩展了大模型的应用边界。随着工具的不断丰富和协议的持续演进,MCP 将在构建更强大的 AI Agent 生态中发挥关键作用。