当前位置: 首页 > news >正文

小智AI模型接入MCP

ESP32接入小智AI的MCP架构

  1. 整体架构图
    ESP32设备 ←→ MCP服务器 ←→ 小智AI云端 ←→ 用户语音终端
    ↑ ↑ ↑ ↑
    本地GPIO 工具转换 AI模型理解 语音识别
    音量控制 协议适配 工具调用 语音合成
  2. MCP服务器实现
    基于小智AI的要求,需要创建一个MCP服务器来桥接ESP32和小智AI:

esp32_mcp_server.py

# esp32_mcp_server.py
from mcp.server.fastmcp import FastMCP
import logging
import asyncio
import websockets
import json
from typing import Dict, Anylogger = logging.getLogger('esp32_mcp')# 创建MCP服务器
mcp = FastMCP("ESP32_Device_Controller")# ESP32设备连接管理
class ESP32Manager:def __init__(self):self.devices = {}  # device_id -> websocket连接async def connect_device(self, device_id: str, websocket_url: str):"""连接到ESP32设备"""try:websocket = await websockets.connect(websocket_url)self.devices[device_id] = websocketlogger.info(f"Connected to ESP32 device: {device_id}")return Trueexcept Exception as e:logger.error(f"Failed to connect to device {device_id}: {e}")return Falseasync def send_mcp_request(self, device_id: str, method: str, params: Dict = None) -> Dict:"""向ESP32发送MCP请求"""if device_id not in self.devices:raise Exception(f"Device {device_id} not connected")websocket = self.devices[device_id]request = {"jsonrpc": "2.0","id": 1,"method": method,"params": params or {}}await websocket.send(json.dumps(request))response = await websocket.recv()return json.loads(response)# 全局设备管理器
esp32_manager = ESP32Manager()# 在启动时连接到ESP32设备
async def initialize_devices():# 这里配置您的ESP32设备WebSocket地址await esp32_manager.connect_device("esp32_001", "ws://192.168.1.100:80/mcp")@mcp.tool()
def get_device_status(device_id: str = "esp32_001") -> dict:"""Get the current status of ESP32 device including volume, GPIO states, battery, network info. Use this tool to check device condition before controlling it."""try:loop = asyncio.get_event_loop()response = loop.run_until_complete(esp32_manager.send_mcp_request(device_id, "tools/call", {"name": "self.get_device_status","arguments": {}}))logger.info(f"Device status: {response}")return {"success": True, "status": response.get("result", {})}except Exception as e:logger.error(f"Failed to get device status: {e}")return {"success": False, "error": str(e)}@mcp.tool()
def set_audio_volume(volume: int, device_id: str = "esp32_001") -> dict:"""Set the audio speaker volume of ESP32 device. Volume should be between 0 and 100."""try:loop = asyncio.get_event_loop()response = loop.run_until_complete(esp32_manager.send_mcp_request(device_id, "tools/call", {"name": "self.audio_speaker.set_volume","arguments": {"volume": volume}}))logger.info(f"Set volume to {volume}: {response}")return {"success": True, "volume": volume}except Exception as e:logger.error(f"Failed to set volume: {e}")return {"success": False, "error": str(e)}@mcp.tool()
def control_gpio_output(pin_number: int, state: bool, device_id: str = "esp32_001") -> dict:"""Control GPIO pin output state on ESP32 device. Set pin to high (true) or low (false) level."""try:loop = asyncio.get_event_loop()response = loop.run_until_complete(esp32_manager.send_mcp_request(device_id, "tools/call", {"name": "self.gpio.set_output","arguments": {"pin": pin_number, "state": state}}))logger.info(f"Set GPIO{pin_number} to {'HIGH' if state else 'LOW'}: {response}")return {"success": True, "pin": pin_number, "state": state}except Exception as e:logger.error(f"Failed to control GPIO: {e}")return {"success": False, "error": str(e)}@mcp.tool()
def read_gpio_input(pin_number: int, pull_mode: str = "none", device_id: str = "esp32_001") -> dict:"""Read GPIO pin input state from ESP32 device. Pull mode can be 'none', 'up', or 'down'."""try:loop = asyncio.get_event_loop()response = loop.run_until_complete(esp32_manager.send_mcp_request(device_id, "tools/call", {"name": "self.gpio.get_input","arguments": {"pin": pin_number, "pull_mode": pull_mode}}))logger.info(f"Read GPIO{pin_number}: {response}")return {"success": True, "pin": pin_number, "state": response.get("result", {})}except Exception as e:logger.error(f"Failed to read GPIO: {e}")return {"success": False, "error": str(e)}@mcp.tool()
def control_led(action: str, device_id: str = "esp32_001") -> dict:"""Control the built-in LED on ESP32 device. Action can be 'on', 'off', 'toggle', or 'status'."""try:loop = asyncio.get_event_loop()response = loop.run_until_complete(esp32_manager.send_mcp_request(device_id, "tools/call", {"name": "self.led.control","arguments": {"action": action}}))logger.info(f"LED {action}: {response}")return {"success": True, "action": action, "result": response.get("result", {})}except Exception as e:logger.error(f"Failed to control LED: {e}")return {"success": False, "error": str(e)}@mcp.tool()
def set_pwm_brightness(brightness: int, device_id: str = "esp32_001") -> dict:"""Set PWM brightness for dimmable devices like LEDs. Brightness should be between 0 and 100 percent."""try:loop = asyncio.get_event_loop()response = loop.run_until_complete(esp32_manager.send_mcp_request(device_id, "tools/call", {"name": "self.pwm.set_brightness","arguments": {"brightness": brightness}}))logger.info(f"Set PWM brightness to {brightness}%: {response}")return {"success": True, "brightness": brightness}except Exception as e:logger.error(f"Failed to set PWM brightness: {e}")return {"success": False, "error": str(e)}# 启动时初始化设备连接
if __name__ == "__main__":# 初始化设备连接asyncio.run(initialize_devices())# 启动MCP服务器mcp.run(transport="stdio")
  1. MCP管道脚本
    创建连接小智AI的管道脚本:
# mcp_pipe.py
import asyncio
import websockets
import json
import subprocess
import sys
import os
import logginglogging.basicConfig(level=logging.INFO)
logger = logging.getLogger('MCP_PIPE')class MCPPipe:def __init__(self, mcp_script_path: str, endpoint: str):self.mcp_script_path = mcp_script_pathself.endpoint = endpointself.process = Noneself.websocket = Noneasync def start_mcp_process(self):"""启动MCP服务器进程"""self.process = await asyncio.create_subprocess_exec(sys.executable, self.mcp_script_path,stdin=asyncio.subprocess.PIPE,stdout=asyncio.subprocess.PIPE,stderr=asyncio.subprocess.PIPE)logger.info(f"Started {self.mcp_script_path} process")async def connect_websocket(self):"""连接到小智AI的WebSocket端点"""logger.info("Connecting to WebSocket server...")self.websocket = await websockets.connect(self.endpoint)logger.info("Successfully connected to WebSocket server")async def handle_websocket_message(self, message):"""处理来自小智AI的消息"""try:# 转发消息到MCP进程self.process.stdin.write(message.encode() + b'\n')await self.process.stdin.drain()# 读取MCP进程的响应response = await self.process.stdout.readline()# 转发响应到小智AIawait self.websocket.send(response.decode().strip())except Exception as e:logger.error(f"Error handling message: {e}")async def run(self):"""运行MCP管道"""await self.start_mcp_process()await self.connect_websocket()try:async for message in self.websocket:await self.handle_websocket_message(message)except websockets.exceptions.ConnectionClosed:logger.info("WebSocket connection closed")finally:if self.process:self.process.terminate()await self.process.wait()async def main():endpoint = os.getenv('MCP_ENDPOINT')if not endpoint:print("Please set MCP_ENDPOINT environment variable")sys.exit(1)if len(sys.argv) != 2:print("Usage: python mcp_pipe.py <mcp_script.py>")sys.exit(1)mcp_script = sys.argv[1]pipe = MCPPipe(mcp_script, endpoint)await pipe.run()if __name__ == "__main__":asyncio.run(main())
  1. 部署和使用
    A. 安装依赖
    pip install mcp websockets asyncio
    B. 配置环境变量

设置小智AI提供的MCP接入点

export MCP_ENDPOINT=“wss://your-xiaozhi-mcp-endpoint”
C. 启动服务

启动ESP32 MCP服务器

python mcp_pipe.py esp32_mcp_server.py
5. ESP32端配置
确保ESP32设备:

启用MCP协议: CONFIG_IOT_PROTOCOL_MCP=y
配置WebSocket服务器: 提供MCP接口
网络连接: 能够被MCP服务器访问
6. 语音控制示例
用户通过小智AI语音终端说话:

用户语音 小智AI理解 MCP工具调用 ESP32执行
“调大音量” 音量控制 set_audio_volume(volume=80) 设置音量到80
“打开LED” LED控制 control_led(action=“on”) GPIO设置LED为高电平
“GPIO2设为高电平” GPIO控制 control_gpio_output(pin_number=2, state=true) 设置GPIO2输出高电平
“检查设备状态” 状态查询 get_device_status() 返回设备完整状态
7. 注意事项
网络配置: 确保MCP服务器能访问ESP32设备
错误处理: 添加设备离线、网络中断的处理逻辑
安全性: 考虑添加设备认证和访问控制
性能优化: 对于频繁操作,考虑连接池和缓存
日志监控: 完善日志记录,便于调试和监控
这样就实现了ESP32设备通过MCP协议接入小智AI的完整方案!

http://www.lryc.cn/news/585535.html

相关文章:

  • 【一起来学AI大模型】微调技术:LoRA(Low-Rank Adaptation) 的实战应用
  • SQL Server通过CLR连接InfluxDB实现异构数据关联查询技术指南
  • SpringBoot JWT
  • Rust与UE5高效集成实战
  • uniapp制作一个个人页面
  • ffmpeg-api记录
  • UC浏览器PC版自2016年后未再更新不支持vue3
  • 小旺AI截图1.2.1版本上线:新增录屏音频、Mac长截屏
  • Docker高级管理--Dockerfile 镜像制作
  • 手把手一起使用Miniforge3+mamba平替Anaconda(Win10)
  • 机器学习week2-线性回归加强
  • Java的extends通配符
  • netdxf—— CAD c#二次开发之(netDxf 处理 DXF 文件)
  • 和鲸社区深度学习基础训练营2025年关卡2(3)pytorch
  • 利用Claude code,只用文字版系统设计大纲,就能轻松实现系统~
  • 免费应用分发平台的安全漏洞和防护机制是什么?
  • 60 美元玩转 Li-Fi —— 开源 OpenVLC 平台入门(附 BeagleBone Black 驱动简单解析)
  • Windows解决 ping 127.0.0.1 一般故障问题
  • 【Linux网络】深入理解HTTP/HTTPS协议:原理、实现与加密机制全面解析
  • 信号量机制
  • 聊聊AI大模型的上下文工程(Context Engineering)
  • Spring 声明式事务:从原理到实现的完整解析
  • 运行ssh -T git@github.com报错
  • 多端协作白板:如何改变传统会议模式!
  • 设计模式 - 面向对象原则:SOLID最佳实践
  • 多态 使用场景
  • 【三维重建工具】NeRFStudio、3D GaussianSplatting、Colmap安装与使用指南
  • VOB如何转换成MP4格式?3种快速转换教程推荐
  • GT IP核仿真测试
  • Kubernetes 高级调度特性