当前位置: 首页 > news >正文

milvus upsert流程源码分析

milvus版本:v2.3.2

整体架构:

在这里插入图片描述

Upsert 的数据流向:

在这里插入图片描述

1.客户端sdk发出Upsert API请求。

import numpy as np
from pymilvus import (connections,Collection,
)num_entities, dim = 4, 3print("start connecting to Milvus")
connections.connect("default", host="192.168.230.71", port="19530")hello_milvus = Collection("hello_milvus")print("Start upsert entities")
rng = np.random.default_rng(seed=19530)
entities = [[0,1,2,4000],[10,11,12,4000],rng.random((num_entities, dim)),
]
hello_milvus.upsert(entities)

2.服务端接受API请求,将request封装为upsertTask,并压入dmQueue队列。

注意这里是dmQueue。DDL类型的是ddQueue。

代码路径:internal\proxy\impl.go

// Upsert upsert records into collection.
func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest) (*milvuspb.MutationResult, error) {......// request封装为upsertTaskit := &upsertTask{baseMsg: msgstream.BaseMsg{HashValues: request.HashKeys,},ctx:       ctx,Condition: NewTaskCondition(ctx),req:       request,result: &milvuspb.MutationResult{Status: merr.Success(),IDs: &schemapb.IDs{IdField: nil,},},idAllocator:   node.rowIDAllocator,segIDAssigner: node.segAssigner,chMgr:         node.chMgr,chTicker:      node.chTicker,}......// 将task压入dmQueue队列if err := node.sched.dmQueue.Enqueue(it); err != nil {......}......// 等待任务执行完if err := it.WaitToFinish(); err != nil {......}......
}

3.执行upsertTask的3个方法PreExecute、Execute、PostExecute。

PreExecute()一般为参数校验等工作。

Execute()为真正执行逻辑。

PostExecute()执行完后的逻辑,什么都不做,返回nil。

代码路径:internal\proxy\task_upsert.go

func (it *upsertTask) Execute(ctx context.Context) (err error) {ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Upsert-Execute")defer sp.End()log := log.Ctx(ctx).With(zap.String("collectionName", it.req.CollectionName))tr := timerecord.NewTimeRecorder(fmt.Sprintf("proxy execute upsert %d", it.ID()))// 拿到stream,类型为msgstream.mqMsgStreamstream, err := it.chMgr.getOrCreateDmlStream(it.collectionID)if err != nil {return err}// 创建msgPackmsgPack := &msgstream.MsgPack{BeginTs: it.BeginTs(),EndTs:   it.EndTs(),}// 添加insertMsgPackerr = it.insertExecute(ctx, msgPack)if err != nil {log.Warn("Fail to insertExecute", zap.Error(err))return err}// 添加deleteMsgPackerr = it.deleteExecute(ctx, msgPack)if err != nil {log.Warn("Fail to deleteExecute", zap.Error(err))return err}tr.RecordSpan()// 发送数据至mqerr = stream.Produce(msgPack)if err != nil {it.result.Status = merr.Status(err)return err}sendMsgDur := tr.RecordSpan()metrics.ProxySendMutationReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.UpsertLabel).Observe(float64(sendMsgDur.Milliseconds()))totalDur := tr.ElapseSpan()log.Debug("Proxy Upsert Execute done", zap.Int64("taskID", it.ID()),zap.Duration("total duration", totalDur))return nil
}

msgPack变量:

在这里插入图片描述

msgPack包含了insertRequest和deleteRequest。

在这里插入图片描述

insertRequest包含了客户端的upsert数据,以及还会有rowid,用来唯一标识一列数据。

在这里插入图片描述

deleteRequest包含主键值。

http://www.lryc.cn/news/307693.html

相关文章:

  • QT网络通信
  • 案例分析|山西某光伏发电站轨道巡检机器人解决方案
  • Apache POl
  • 高防服务器托管应注意什么
  • swagger-ui.html报错404,解决办法
  • golang 函数式编程库samber/mo使用: Future
  • 【Spring连载】使用Spring Data访问 MongoDB(十四)----Mongodb特有的查询方法
  • 消息中间件篇之RabbitMQ-消息重复消费
  • 常见设计模式之单例模式
  • VL817-Q7 USB3.0 HUB芯片 适用于扩展坞 工控机 显示器
  • 【Android安全】Windows 环境下载 AOSP 源码
  • Vue.js+SpringBoot开发快递管理系统
  • Linux/Spectra
  • C 嵌入式系统设计模式 08:硬件代理模式
  • 【k8s配置与存储--持久化存储(PV、PVC、存储类)】
  • 【Vite】解决Vite http proxy error: Error: connect ECONNREFUSED
  • FPGA领域顶级学术会议
  • 罗技鼠标滚轮模式介绍 | 鼠标滚轮异响 - 解决方案
  • Scrapy与分布式开发(2.2):正则表达式
  • 今年“全国爱耳日”主题确定!立聪堂助听器组织社区义诊
  • 区块链智能合约开发
  • Android 启动流程及 init 进程解析
  • Java设计模式:核心概述(一)
  • 计算机网络:IP
  • CSS中使用变量的两个函数var和calc
  • 了解docker与k8s
  • 服务器防火墙的应用技术有哪些
  • 打开 Camera app 出图,前几帧图像偏暗、偏色该怎样去避免?
  • SD-WAN技术:优化国内外服务器访问的关键
  • 【MySQL】学习和总结标量子查询