当前位置: 首页 > news >正文

【Dify(v1.x) 核心源码深入解析】mcp 模块

重磅推荐专栏:
《大模型AIGC》
《课程大纲》
《知识星球》

本专栏致力于探索和讨论当今最前沿的技术趋势和应用领域,包括但不限于ChatGPT和Stable Diffusion等。我们将深入研究大型模型的开发和应用,以及与之相关的人工智能生成内容(AIGC)技术。通过深入的技术解析和实践经验分享,旨在帮助读者更好地理解和应用这些领域的最新进展

一、MCP 模块整体架构

1.1 什么是 MCP?

MCP(Model Context Protocol)是 Dify 平台中用于连接大模型与工具服务的标准化协议。它解决了三个核心问题:

  • 工具发现:Agent 如何动态发现可用工具
  • 安全调用:如何安全执行工具操作
  • 结果整合:如何将工具结果整合到模型响应中
    在这里插入图片描述

1.2 MCP 核心组件

通过代码,我们可以梳理出 MCP 的核心架构:
在这里插入图片描述

二、Agent 工作全流程解析

2.1 完整调用时序

当用户提问时,Agent 通过 MCP 调用工具插件的完整过程:
在这里插入图片描述

2.2 各阶段 Prompt 构造详解

  • 阶段1:工具发现(list_tools)

Agent 构造的提示词:

你是一个智能助手,可以调用外部工具解决问题。
当前可用工具列表:
{{TOOL_LIST}}用户问题:{{USER_QUERY}}
请分析是否需要调用工具?需要调用哪个工具?
  • 阶段2:工具参数解析

Agent 构造的提示词:

你将调用工具:{{TOOL_NAME}}
工具描述:{{TOOL_DESCRIPTION}}
工具参数要求:
{{PARAMETER_SCHEMA}}用户输入:{{USER_INPUT}}
请提取符合工具要求的参数JSON
  • 阶段3:结果整合

Agent 构造的提示词:

工具调用结果:
{{TOOL_RESULTS}}原始用户问题:{{USER_QUERY}}
请根据以上信息生成最终回答

三、核心代码深度解析

3.1 MCP 客户端初始化

class MCPClient:def __init__(self,server_url: str,          # MCP 服务端URLprovider_id: str,          # 工具提供商标识tenant_id: str,            # 租户IDauthed: bool = True,       # 是否需要认证authorization_code: Optional[str] = None,  # OAuth授权码):# 初始化基础信息self.provider_id = provider_idself.tenant_id = tenant_idself.server_url = server_url# 认证相关初始化self.authed = authedself.authorization_code = authorization_codeif authed:from core.mcp.auth.auth_provider import OAuthClientProvider# 创建OAuth提供者self.provider = OAuthClientProvider(self.provider_id, self.tenant_id)# 获取访问令牌self.token = self.provider.tokens()# 会话管理初始化self._session = Noneself.exit_stack = ExitStack()  # 上下文管理栈

3.2 工具调用核心流程

def invoke_tool(self, tool_name: str, tool_args: dict):# 检查会话是否初始化if not self._initialized or not self._session:raise ValueError("Session not initialized.")# 通过ClientSession执行工具调用return self._session.call_tool(tool_name, tool_args)# ClientSession中的实际调用
class ClientSession(BaseSession):def call_tool(self,name: str,arguments: dict[str, Any] | None = None,) -> types.CallToolResult:# 构造工具调用请求request = types.CallToolRequest(method="tools/call",params=types.CallToolRequestParams(name=name, arguments=arguments))# 发送请求并获取响应return self.send_request(types.ClientRequest(request),types.CallToolResult)

3.3 服务端工具执行

class MCPServerStreamableHTTPRequestHandler:def invoke_tool(self):# 获取当前用户if not self.end_user:raise ValueError("User not found")# 解析工具调用请求request = cast(types.CallToolRequest, self.request.root)args = request.params.arguments# 根据不同应用模式构造参数if self.app.mode == AppMode.WORKFLOW:args = {"inputs": args}elif self.app.mode == AppMode.COMPLETION:args = {"query": "", "inputs": args}else:args = {"query": args["query"], "inputs": {k:v for k,v in args.items() if k != "query"}}# 调用AppGenerateService执行实际工具操作response = AppGenerateService.generate(self.app,self.end_user,args,InvokeFrom.SERVICE_API,streaming=self.app.mode == AppMode.AGENT_CHAT.value,)# 处理流式响应answer = ""if isinstance(response, RateLimitGenerator):for item in response.generator:# 解析流式响应中的有效数据if item.startswith("data: "):try:parsed = json.loads(item[6:].strip())if parsed.get("event") == "agent_thought":answer += parsed.get("thought", "")# 构造工具调用响应return types.CallToolResult(content=[types.TextContent(text=answer, type="text")])

四、认证与安全机制

4.1 OAuth 2.0 认证流程

在这里插入图片描述

4.2 认证代码实现

class OAuthClientProvider:def __init__(self, provider_id: str, tenant_id: str, for_list: bool = False):# 获取MCP提供者配置if for_list:self.mcp_provider = MCPToolManageService.get_mcp_provider_by_provider_id(provider_id, tenant_id)else:self.mcp_provider = MCPToolManageService.get_mcp_provider_by_server_identifier(provider_id, tenant_id)def tokens(self) -> Optional[OAuthTokens]:# 从数据库获取存储的tokencredentials = self.mcp_provider.decrypted_credentialsif not credentials:return Nonereturn OAuthTokens(access_token=credentials.get("access_token", ""),token_type=credentials.get("token_type", "Bearer"),expires_in=int(credentials.get("expires_in", "3600") or 3600),refresh_token=credentials.get("refresh_token", ""),)def save_tokens(self, tokens: OAuthTokens) -> None:# 将新token存储到数据库token_dict = tokens.model_dump()MCPToolManageService.update_mcp_provider_credentials(self.mcp_provider, token_dict, authed=True)

五、传输层实现剖析

5.1 双工通信模型

MCP 使用生产者-消费者模式实现双工通信:
在这里插入图片描述

5.2 Streamable HTTP 传输实现

@contextmanager
def streamablehttp_client(url, headers=None, timeout=30, sse_read_timeout=300):transport = StreamableHTTPTransport(url, headers, timeout, sse_read_timeout)# 创建通信队列server_to_client = queue.Queue()  # 服务端->客户端client_to_server = queue.Queue()  # 客户端->服务端with ThreadPoolExecutor(max_workers=2) as executor:with create_ssrf_proxy_mcp_http_client() as client:# 启动写线程executor.submit(transport.post_writer,client,client_to_server,  # 从该队列读取请求server_to_client,  # 向该队列写入响应)# 启动读线程executor.submit(transport.handle_get_stream,client,server_to_client)yield (server_to_client,  # 客户端读取此队列client_to_server,   # 客户端写入此队列transport.get_session_id)

六、性能优化策略

6.1 连接复用机制

通过会话管理实现连接复用:

class MCPClient:def __enter__(self):self._initialize()  # 初始化连接self._initialized = Truereturn selfdef __exit__(self, exc_type, exc_value, traceback):self.cleanup()  # 清理资源def _initialize(self):"""根据URL自动选择最优传输协议"""connection_methods = {"mcp": streamablehttp_client,  # HTTP长连接"sse": sse_client,            # SSE流}# 自动协议选择逻辑parsed_url = urlparse(self.server_url)path_segment = parsed_url.path.rstrip("/").split("/")[-1] or ""try:# 首先尝试根据路径选择协议client_factory = connection_methods[path_segment]self.connect_server(client_factory, path_segment)except KeyError:# 自动回退机制try:self.connect_server(sse_client, "sse")except MCPConnectionError:self.connect_server(streamablehttp_client, "mcp")

6.2 负载均衡策略

在这里插入图片描述

七、应用实践案例

7.1 天气预报工具集成

工具定义:

{"name": "weather_forecast","description": "获取指定位置的天气预报","inputSchema": {"type": "object","properties": {"location": {"type": "string","description": "城市名称"},"date": {"type": "string","format": "date","description": "日期 (YYYY-MM-DD)"}},"required": ["location"]}
}

调用过程:

  1. Agent 识别用户需要天气预报
  2. 通过 MCPClient.list_tools() 获取工具
  3. 构造参数调用 invoke_tool(“weather_forecast”, {“location”: “北京”})
  4. 服务端调用天气 API 获取数据
  5. 返回结构化结果:
    {
    “content”: [{
    “type”: “text”,
    “text”: “北京今日天气:晴,25℃”
    }]
    }

八、总结与展望

8.1 MCP 核心价值

• 标准化协议:统一工具接入规范

• 安全隔离:OAuth2.0 + SSRF 防护

• 灵活扩展:支持多种传输协议

• 性能优化:连接复用 + 流式处理

8.2 未来演进方向

  1. 协议扩展:支持 gRPC 等高性能协议
  2. 联邦学习:跨工具的知识共享
  3. 智能路由:基于工具性能的自动路由
  4. 缓存优化:工具结果的智能缓存

MCP 模块作为 Dify 平台的工具连接层,通过标准化的协议设计和灵活的实现架构,使 Agent 能够安全高效地集成和使用各种外部工具,极大地扩展了大模型的应用边界。随着工具的不断丰富和协议的持续演进,MCP 将在构建更强大的 AI Agent 生态中发挥关键作用。

http://www.lryc.cn/news/584981.html

相关文章:

  • 4.丢出异常捕捉异常TryCatch C#例子
  • USB数据丢包真相:为什么log打印会导致高频USB数据丢包?
  • mysql数据库导入导出命令
  • 【Linux-云原生-笔记】系统引导修复(grub、bios、内核、系统初始化等)
  • Grok-4 发布会图文总结
  • 苹果UI 设计
  • SLICEGPT: COMPRESS LARGE LANGUAGE MODELSBY DELETING ROWS AND COLUMNS
  • Deepseek-如何从零开始开发需要专业知识的prompt
  • 8155平台SPI学习笔记
  • 从零实现一个GPT 【React + Express】--- 【4】实现文生图的功能
  • 深入剖析Spring Bean生命周期:从诞生到消亡的全过程
  • 英文国际期刊推荐:MEDS Chinese Medicine,中医药方向可发
  • 47-RK3588 用瑞芯微官方提供recovery进行OTA升级
  • Auto-GPT 简易教程
  • 前端抓包(不启动前端项目就能进行后端调试)--whistle
  • UI前端与数字孪生融合新领域:智慧环保的垃圾分类与回收系统
  • Windos服务器升级MySQL版本
  • 中国银联豪掷1亿采购海光C86架构服务器
  • 如何查看自己本地的公网IP地址?内网环境网络如何开通服务器公网ip提供互联网访问?
  • 电力分析仪的“双语对话”:CCLinkIE与Modbus TCP的无缝连接
  • 从《哪吒 2》看个人IP的破局之道|创客匠人
  • 【实用IP查询工具】IP数据云-IP地址查询离线库使用方案
  • 服务器机柜与网络机柜各自的优势
  • 解决Linux绑定失败地址已使用(端口被占用)的问题
  • python的卷烟营销数据统计分析系统
  • AIStarter新版重磅来袭!永久订阅限时福利抢先看
  • Spring Cloud Gateway介绍 - -基础概念,简单工作原理和配置示例
  • Element Plus和Ant Design Vue深度对比分析与选型指南
  • 飞算 JavaAI:开启 Java 开发新时代
  • C++——构造函数的补充:初始化列表