当前位置: 首页 > article >正文

吴恩达MCP课程(3):mcp_chatbot

原课程代码是用Anthropic写的,下面代码是用OpenAI改写的,模型则用阿里巴巴的模型做测试
.env 文件为:

OPENAI_API_KEY=sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
OPENAI_API_BASE=https://dashscope.aliyuncs.com/compatible-mode/v1

目录

    • 代码
    • 代码解释
      • 1. 导入和初始化
      • 2. MCP_ChatBot类初始化
      • 3. 查询处理核心方法
        • 3.1 消息处理循环
        • 3.2 工具调用处理
      • 4. 聊天循环
      • 5. 服务器连接和工具初始化
      • 6. 程序入口
      • 核心特性
    • 示例

代码

from dotenv import load_dotenv
import openai
from mcp import ClientSession, StdioServerParameters, types
from mcp.client.stdio import stdio_client
from typing import List
import asyncio
import json
import osload_dotenv()class MCP_ChatBot:def __init__(self):# Initialize session and client objectsself.session: ClientSession = Noneself.client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"),base_url=os.getenv("OPENAI_API_BASE"))self.available_tools: List[dict] = []async def process_query(self, query):messages = [{'role':'user', 'content':query}]response = self.client.chat.completions.create(model='qwen-turbo',max_tokens=2024,tools=self.available_tools,messages=messages)process_query = Truewhile process_query:# 获取助手的回复message = response.choices[0].message# 检查是否有普通文本内容if message.content:print(message.content)process_query = False# 检查是否有工具调用elif message.tool_calls:# 添加助手消息到历史messages.append({"role": "assistant", "content": None,"tool_calls": message.tool_calls})# 处理每个工具调用for tool_call in message.tool_calls:tool_id = tool_call.idtool_name = tool_call.function.nametool_args = json.loads(tool_call.function.arguments)print(f"Calling tool {tool_name} with args {tool_args}")# 执行工具调用result = await self.session.call_tool(tool_name, arguments=tool_args)# 添加工具结果到消息历史messages.append({"role": "tool","tool_call_id": tool_id,"content": result.content})# 获取下一个回复response = self.client.chat.completions.create(model='qwen-turbo',max_tokens=2024,tools=self.available_tools,messages=messages)# 如果只有文本回复,则结束处理if response.choices[0].message.content and not response.choices[0].message.tool_calls:print(response.choices[0].message.content)process_query = Falseasync def chat_loop(self):"""Run an interactive chat loop"""print("\nMCP Chatbot Started!")print("Type your queries or 'quit' to exit.")while True:try:query = input("\nQuery: ").strip()if query.lower() == 'quit':breakawait self.process_query(query)print("\n")except Exception as e:print(f"\nError: {str(e)}")async def connect_to_server_and_run(self):# Create server parameters for stdio connectionserver_params = StdioServerParameters(command="uv",  # Executableargs=["run", "research_server.py"],  # Optional command line argumentsenv=None,  # Optional environment variables)async with stdio_client(server_params) as (read, write):async with ClientSession(read, write) as session:self.session = session# Initialize the connectionawait session.initialize()# List available toolsresponse = await session.list_tools()tools = response.toolsprint("\nConnected to server with tools:", [tool.name for tool in tools])self.available_tools = [{"type": "function","function": {"name": tool.name,"description": tool.description,"parameters": tool.inputSchema}} for tool in response.tools]await self.chat_loop()async def main():chatbot = MCP_ChatBot()await chatbot.connect_to_server_and_run()if __name__ == "__main__":asyncio.run(main())

代码解释

1. 导入和初始化

from dotenv import load_dotenv
import openai
from mcp import ClientSession, StdioServerParameters, types
from mcp.client.stdio import stdio_client
from typing import List
import asyncio
import json
import osload_dotenv()
  • load_dotenv(): 加载环境变量文件(.env)中的配置
  • openai: OpenAI API客户端库
  • mcp: Model Context Protocol相关模块,用于与MCP服务器通信
  • asyncio: 异步编程支持
  • json: JSON数据处理

2. MCP_ChatBot类初始化

class MCP_ChatBot:def __init__(self):self.session: ClientSession = Noneself.client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"),base_url=os.getenv("OPENAI_API_BASE"))self.available_tools: List[dict] = []
  • session: MCP客户端会话,用于与MCP服务器通信
  • client: OpenAI客户端,配置API密钥和基础URL
  • available_tools: 存储从MCP服务器获取的可用工具列表

3. 查询处理核心方法

async def process_query(self, query):messages = [{'role':'user', 'content':query}]response = self.client.chat.completions.create(model='qwen-turbo',max_tokens=2024,tools=self.available_tools,messages=messages)

这个方法是整个系统的核心,处理用户查询的完整流程:

3.1 消息处理循环
process_query = True
while process_query:message = response.choices[0].messageif message.content:print(message.content)process_query = False
  • 检查AI回复是否包含文本内容
  • 如果有文本内容,直接输出并结束处理
3.2 工具调用处理
elif message.tool_calls:messages.append({"role": "assistant", "content": None,"tool_calls": message.tool_calls})for tool_call in message.tool_calls:tool_id = tool_call.idtool_name = tool_call.function.nametool_args = json.loads(tool_call.function.arguments)result = await self.session.call_tool(tool_name, arguments=tool_args)messages.append({"role": "tool","tool_call_id": tool_id,"content": result.content})

工具调用处理流程:

  1. 将助手的工具调用请求添加到消息历史
  2. 遍历每个工具调用请求
  3. 提取工具名称、参数和调用ID
  4. 通过MCP会话执行实际的工具调用
  5. 将工具执行结果添加到消息历史
  6. 继续与AI对话,获取基于工具结果的最终回复

4. 聊天循环

async def chat_loop(self):print("\nMCP Chatbot Started!")print("Type your queries or 'quit' to exit.")while True:try:query = input("\nQuery: ").strip()if query.lower() == 'quit':breakawait self.process_query(query)print("\n")except Exception as e:print(f"\nError: {str(e)}")
  • 提供交互式命令行界面
  • 持续接收用户输入
  • 调用process_query处理每个查询
  • 包含异常处理机制

5. 服务器连接和工具初始化

async def connect_to_server_and_run(self):server_params = StdioServerParameters(command="uv",args=["run", "research_server.py"],env=None,)async with stdio_client(server_params) as (read, write):async with ClientSession(read, write) as session:self.session = sessionawait session.initialize()response = await session.list_tools()tools = response.toolsself.available_tools = [{"type": "function","function": {"name": tool.name,"description": tool.description,"parameters": tool.inputSchema}} for tool in response.tools]await self.chat_loop()

服务器连接流程:

  1. 配置MCP服务器参数(使用uv运行research_server.py)
  2. 建立stdio通信连接
  3. 创建客户端会话并初始化
  4. 获取服务器提供的工具列表
  5. 将工具格式转换为OpenAI兼容格式
  6. 启动聊天循环

6. 程序入口

async def main():chatbot = MCP_ChatBot()await chatbot.connect_to_server_and_run()if __name__ == "__main__":asyncio.run(main())
  • 创建聊天机器人实例
  • 启动异步主程序

核心特性

  1. 异步处理: 全程使用async/await,支持高并发
  2. 工具集成: 通过MCP协议动态获取和调用外部工具
  3. 对话连续性: 维护完整的对话历史,支持多轮对话
  4. 错误处理: 包含异常捕获和错误提示
  5. 灵活配置: 通过环境变量配置API密钥和服务器地址

示例

uv run mcp_chatbot.py

请添加图片描述
其他相关链接:
吴恩达MCP课程(1):chat_bot
吴恩达MCP课程(2):research_server

http://www.lryc.cn/news/2394504.html

相关文章:

  • MySQL访问控制与账号管理:原理、技术与最佳实践
  • AWS 创建VPC 并且添加权限控制
  • langchain学习 01
  • 【清晰教程】查看和修改Git配置情况
  • JAVA 常用 API 正则表达式
  • 光电设计大赛智能车激光对抗方案分享:低成本高效备赛攻略
  • Python实现P-PSO优化算法优化BP神经网络回归模型项目实战
  • Microsoft的在word中选择文档中的所有表格进行字体和格式的调整时的解决方案
  • C++23:关键特性与最新进展深度解析
  • Rust并发编程实践指南
  • Kubernetes资源申请沾满但是实际的资源占用并不多,是怎么回事?
  • 鲲鹏Arm+麒麟V10 K8s 离线部署教程
  • PGSQL结合linux cron定期执行vacuum_full_analyze命令
  • php 中使用MQTT
  • C#定时器深度对比:System.Timers.Timer vs System.Threading.Timer性能实测与选型指南
  • go的select多路复用
  • 深度理解与剖析:前端声明式组件系统
  • 解决8080端口被占问题
  • 介绍一种LDPC码译码器
  • 3DMAX+Photoshop教程:将树木和人物添加到户外建筑场景中的方法
  • 【IOS】【OC】【应用内打印功能的实现】如何在APP内实现打印功能,连接本地打印机,把想要打印的界面打印成图片
  • 随记 配置服务器的ssl整个过程
  • 数据库高可用架构设计:集群、负载均衡与故障转移实践
  • Correlations氛围测试:文本或图像的相似度热图
  • 从0到1:多医院陪诊小程序开发笔记(上)
  • 建立连接后 TCP 请求卡住
  • 尚硅谷redis7 99 springboot整合redis之连接集群
  • hive 笔记
  • 无线通信模块简介
  • Go语言之空接口与类型断言