当前位置: 首页 > news >正文

美团搜索推荐统一Agent之交互协议与多Agent协同

🌈 我是“没事学AI”!要是这篇文章让你学 AI 的路上有了点收获:
👁️ 【关注】跟我一起挖 AI 的各种门道,看看它还有多少新奇玩法等着咱们发现
👍 【点赞】为这些有用的 AI 知识鼓鼓掌,让更多人知道学 AI 也能这么轻松
🔖 【收藏】把这些 AI 小技巧存起来,啥时候想练手了,翻出来就能用
💬 【评论】说说你学 AI 时的想法和疑问,让大家的思路碰出更多火花
学 AI 的路还长,咱们结伴同行,在 AI 的世界里找到属于自己的乐趣和成就!

目录

    • 一、详细设计
      • 1. 多Agent交互协议设计
        • 1.1 协议核心模块
        • 1.2 消息格式规范
      • 2. 多Agent协同机制设计
        • 2.1 协同场景核心逻辑
    • 二、具体实现
      • 任务1:交互协议实现
        • 实现步骤:
      • 任务2:多Agent协同逻辑开发
        • 实现步骤:
      • 验证结果

一、详细设计

1. 多Agent交互协议设计

1.1 协议核心模块
模块功能描述技术实现
消息格式定义统一Agent间通信的数据结构(同步/异步)JSON序列化 + 自定义字段校验
通信模式适配同步通信(RPC)与异步通信(Kafka)切换动态代理 + 注解驱动(@Sync/@Async)
消息可靠性保障异步消息重试、幂等性处理Kafka事务消息 + 消息ID去重
序列化工具高效数据序列化/反序列化(支持复杂对象)Protostuff(二进制) + Jackson(JSON)
1.2 消息格式规范
  • 同步通信消息(RPC):轻量结构,侧重实时性

    {"msgId": "sync_123456",  // 消息ID(用于追踪)"sender": "intent_agent", // 发送方Agent名称"receiver": "recall_agent", // 接收方Agent名称"timestamp": 1718900000000, // 发送时间戳"data": {  // 业务数据(与Agent接口输入匹配)"intent": "购买水果","entities": ["苹果", "香蕉"],"confidence": 0.92},"traceId": "trace_789"  // 分布式追踪ID
    }
    
  • 异步通信消息(Kafka):包含状态字段,支持重试

    {"msgId": "async_789012","sender": "rank_agent","receiver": "reason_agent","timestamp": 1718900001000,"data": {"sortedGoodsIds": ["g1001", "g1002"],"rankScores": [0.95, 0.88]},"status": "PENDING",  // 状态:PENDING/SUCCESS/FAILED"retryCount": 0,      // 重试次数"traceId": "trace_789"
    }
    

2. 多Agent协同机制设计

2.1 协同场景核心逻辑
  • 意图-召回协同:意图Agent将实体信息结构化传递给召回Agent,确保召回商品与意图匹配(如“苹果”既可能是水果也可能是手机,需通过实体类型区分)。
  • 动态负载均衡:当某类Agent(如排序Agent)压力过高时,自动将任务分流至备用实例(基于Dubbo负载均衡扩展)。
  • 结果对齐机制:推荐理由生成Agent需关联排序Agent的评分结果,确保理由与商品优先级匹配(如“推荐理由1对应评分最高的商品”)。

二、具体实现

任务1:交互协议实现

实现步骤:
  1. 消息格式与序列化工具开发

    • 定义消息基础类(支持同步/异步通用字段):
      @Data
      public class AgentMessage<T> {private String msgId;private String sender;private String receiver;private long timestamp;private T data;private String traceId;// 异步消息特有字段(通过继承扩展)public static class Async<T> extends AgentMessage<T> {private String status;private int retryCount;}
      }
      
    • 开发序列化工具(支持JSON与二进制切换):
      @Service
      public class MessageSerializer {// JSON序列化(用于调试和简单对象)private final ObjectMapper jsonMapper = new ObjectMapper();// 二进制序列化(用于高性能场景)private final ProtostuffSerializer protoSerializer = new ProtostuffSerializer();public byte[] serialize(Object data, boolean useBinary) {if (useBinary) {return protoSerializer.serialize(data);} else {return jsonMapper.writeValueAsBytes(data);}}public <T> T deserialize(byte[] bytes, Class<T> clazz, boolean useBinary) {if (useBinary) {return protoSerializer.deserialize(bytes, clazz);} else {return jsonMapper.readValue(bytes, clazz);}}
      }
      
  2. 通信模式适配(同步/异步切换)

    • 基于注解驱动实现通信模式选择:
      // 同步通信注解(默认)
      @Retention(RetentionPolicy.RUNTIME)
      @Target(ElementType.METHOD)
      public @interface Sync {int timeout() default 300; // 超时时间(ms)
      }// 异步通信注解
      @Retention(RetentionPolicy.RUNTIME)
      @Target(ElementType.METHOD)
      public @interface Async {String topic() default "agent-communication"; // Kafka主题int maxRetry() default 3; // 最大重试次数
      }// Agent服务接口示例(自动适配通信模式)
      public interface RecallAgent {@Sync(timeout = 500)  // 同步调用RecallResult recall(IntentData data);@Async(topic = "recall-async")  // 异步调用void asyncRecall(IntentData data, Callback callback);
      }
      
  3. 消息可靠性保障

    • 异步消息重试与幂等处理:
      @Component
      public class KafkaMessageListener {@Autowiredprivate AgentMessageHandler handler;@Autowiredprivate RedisTemplate<String, String> redisTemplate;@KafkaListener(topics = "agent-communication")public void onMessage(ConsumerRecord<String, String> record, Acknowledgment ack) {String msgId = record.headers().lastHeader("msgId").value().toString();// 1. 幂等性校验(防止重复消费)if (Boolean.TRUE.equals(redisTemplate.hasKey("msg:processed:" + msgId))) {ack.acknowledge();return;}// 2. 解析消息并处理AgentMessage.Async<?> message = parseMessage(record.value());try {handler.process(message);// 3. 标记为已处理redisTemplate.opsForValue().set("msg:processed:" + msgId, "1", 24, TimeUnit.HOURS);ack.acknowledge();} catch (Exception e) {// 4. 重试判断(未达最大重试次数则转发至重试队列)if (message.getRetryCount() < message.getMaxRetry()) {message.setRetryCount(message.getRetryCount() + 1);sendToRetryQueue(message);}ack.acknowledge(); // 避免重复拉取}}
      }
      

任务2:多Agent协同逻辑开发

实现步骤:
  1. 意图-召回协同逻辑

    • 意图Agent输出结构化实体信息,召回Agent基于实体类型精准召回:
      // 意图Agent输出增强(包含实体类型)
      public class IntentData {private String intent;private List<Entity> entities; // 实体包含类型信息@Datapublic static class Entity {private String name; // 如"苹果"private String type; // 如"fruit"或"electronic"private float score; // 实体置信度}
      }// 召回Agent处理逻辑
      @Service
      public class RecallAgentImpl implements RecallAgent {@Overridepublic RecallResult recall(IntentData data) {// 根据实体类型过滤召回池(如type=fruit则从生鲜库召回)List<String> goodsIds = goodsRepository.recallByEntityTypes(data.getEntities().stream().map(Entity::getType).collect(Collectors.toList()));return new RecallResult(goodsIds);}
      }
      
  2. 动态负载均衡扩展

    • 基于Dubbo自定义负载均衡策略(优先选择负载低的Agent实例):
      public class AgentLoadBalance extends AbstractLoadBalance {@Overrideprotected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {// 1. 获取各实例的当前负载(通过监控指标)List<Invoker<T>> availableInvokers = new ArrayList<>();for (Invoker<T> invoker : invokers) {// 从监控中心获取该实例的CPU使用率double cpuUsage = monitorService.getCpuUsage(invoker.getUrl().getHost());if (cpuUsage < 70) { // CPU使用率<70%视为可用availableInvokers.add(invoker);}}// 2. 从可用实例中随机选择(简单策略,可优化为加权)if (availableInvokers.isEmpty()) {return invokers.get(ThreadLocalRandom.current().nextInt(invokers.size()));} else {return availableInvokers.get(ThreadLocalRandom.current().nextInt(availableInvokers.size()));}}
      }
      

验证结果

  1. 交互协议功能验证

    • 同步通信:意图Agent调用召回Agent,传递“购买水果(实体类型=fruit)”数据,召回Agent返回10个生鲜类商品ID,响应时间45ms(≤500ms超时阈值)。
    • 异步通信:排序Agent发送商品排序结果至推荐理由生成Agent,Kafka消息延迟≤100ms,模拟消息丢失场景时,重试机制触发并成功重新投递。
    • 幂等性验证:重复发送3条相同msgId的消息,仅首次被处理,后续均被过滤(Redis去重生效)。
  2. 多Agent协同验证

    • 意图-召回协同:当用户查询“苹果”且实体类型识别为“fruit”时,召回结果中90%为水果类商品(对比无类型区分时的60%,精准度提升30%)。
    • 负载均衡:模拟排序Agent集群中1台实例CPU达80%,新请求自动分流至其他CPU<50%的实例,分流后各实例负载差≤15%。

🌟 大家好,我是“没事学AI”!
🤖 在AI的星辰大海里,我是那个执着的航海者,带着对智能的好奇不断探索。
📚 每一篇技术解析都是我打磨的罗盘,每一次模型实操都是我扬起的风帆。
💻 每一行代码演示都是我的航线记录,每一个案例拆解都是我的藏宝图绘制。
🚀 在人工智能的浪潮中,我既是领航员也是同行者。让我们一起,在AI学习的航程里,解锁更多AI的奥秘与可能——别忘了点赞、关注、收藏,跟上我的脚步,让“没事学AI”陪你从入门到精通!

http://www.lryc.cn/news/619673.html

相关文章:

  • 在es中安装kibana
  • 动静态库
  • ICCV 2025 | 4相机干掉480机位?CMU MonoFusion高斯泼溅重构4D人体!
  • 内容索引之word转md工具 - markitdown
  • (cvpr2025) IceDiff: 高分辨率北极海冰预报
  • duiLib 利用布局文件显示一个窗口并响应事件
  • 基于UniApp的新大陆物联网平台温湿度检测系统开发方案
  • 在JVM跑JavaScript脚本 | Oracle GraalJS 简介与实践
  • 【AI论文】GLM-4.5:具备智能体特性、推理能力与编码能力的(ARC)基础模型
  • Avalon-MM协议
  • 浅层神经网络
  • SimD小目标样本分配方法
  • 开发避坑指南(24):RocketMQ磁盘空间告急异常处理,CODE 14 “service not available“解决方案
  • 设计原则之【抽象层次一致性(SLAP)】,方法也分三六九等
  • 从零到一:TCP 回声服务器与客户端的完整实现与原理详解
  • Linux LNMP配置全流程
  • 机器学习之词向量转换
  • 第5章 学习的机制
  • 对比学习中核心损失函数的发展脉络
  • AI服务器需求激增,三星内存与SSD供不应求,HBM与DDR5成关键驱动力
  • 2025年高效能工程项目管理软件推荐榜单:AI重构工程进度可视化与资源动态调度体系
  • kernel pwn 入门(四) ret2dir详细
  • 《嵌入式Linux应用编程():Linux Framebuffer图形编程》
  • Win11和Mac设置环境变量
  • 机器学习阶段性总结:对深度学习本质的回顾 20250813
  • Html5-canvas动态渐变背景
  • mac 安卓模拟器 blueStacks
  • MacOS字体看起来比在 Windows 上更好?
  • 367. 有效的完全平方数
  • Spring Boot + MyBatis