当前位置: 首页 > news >正文

milvus Delete API流程源码分析

Delete API执行流程源码解析

milvus版本:v2.3.2

整体架构:

在这里插入图片描述

Delete 的数据流向:

在这里插入图片描述

1.客户端sdk发出Delete API请求。

from pymilvus import (connections,Collection,
)print("start connecting to Milvus")
connections.connect("default", host="192.168.230.71", port="19530")hello_milvus = Collection("hello_milvus")print("Start delete entities")
## expr = "book_id in [0,1]" 主键
expr = "pk in [447868867306324066,447868867306324067]" ## 非主键
delete_result = hello_milvus.delete(expr)
print(delete_result)

2.服务端接受API请求,将request封装为deleteTask,并压入dmQueue队列。

注意这里是dmQueue。DDL类型的是ddQueue。

代码路径:internal\proxy\impl.go

// Delete delete records from collection, then these records cannot be searched.
func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) (*milvuspb.MutationResult, error) {......// request封装为deleteTaskdt := &deleteTask{ctx:         ctx,Condition:   NewTaskCondition(ctx),req:         request,idAllocator: node.rowIDAllocator,chMgr:       node.chMgr,chTicker:    node.chTicker,lb:          node.lbPolicy,}......// 将task压入dmQueue队列// MsgID will be set by Enqueue()if err := node.sched.dmQueue.Enqueue(dt); err != nil {......}......// 等待任务执行完if err := dt.WaitToFinish(); err != nil {......}......
}

DeleteRequest数据结构:

type DeleteRequest struct {Base                 *commonpb.MsgBaseDbName               stringCollectionName       stringPartitionName        stringExpr                 stringHashKeys             []uint32XXX_NoUnkeyedLiteral struct{}XXX_unrecognized     []byteXXX_sizecache        int32
}

3.执行deleteTask的3个方法PreExecute、Execute、PostExecute。

PreExecute()一般为参数校验等工作。

Execute()为真正执行逻辑。

PostExecute()执行完后的逻辑,什么都不做,返回nil。

代码路径:internal\proxy\task_delete.go

func (dt *deleteTask) Execute(ctx context.Context) (err error) {ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Delete-Execute")defer sp.End()log := log.Ctx(ctx)if len(dt.req.GetExpr()) == 0 {return merr.WrapErrParameterInvalid("valid expr", "empty expr", "invalid expression")}dt.tr = timerecord.NewTimeRecorder(fmt.Sprintf("proxy execute delete %d", dt.ID()))// 拿到stream,类型为msgstream.mqMsgStreamstream, err := dt.chMgr.getOrCreateDmlStream(dt.collectionID)if err != nil {return err}plan, err := planparserv2.CreateRetrievePlan(dt.schema, dt.req.Expr)if err != nil {return fmt.Errorf("failed to create expr plan, expr = %s", dt.req.GetExpr())}// 判断走simpleDelete还是complexDeleteisSimple, termExp := getExpr(plan)if isSimple {// if could get delete.primaryKeys from delete exprerr := dt.simpleDelete(ctx, termExp, stream)if err != nil {return err}} else {// if get complex delete expr// need query from querynode before deleteerr = dt.complexDelete(ctx, plan, stream)if err != nil {log.Warn("complex delete failed,but delete some data", zap.Int("count", dt.count), zap.String("expr", dt.req.GetExpr()))return err}}return nil
}

expr如果是主键表达式则走simpleDelete,否则走complexDelete。

4.simpleDelete

func (dt *deleteTask) simpleDelete(ctx context.Context, termExp *planpb.Expr_TermExpr, stream msgstream.MsgStream) error {primaryKeys, numRow, err := getPrimaryKeysFromExpr(dt.schema, termExp)if err != nil {log.Info("Failed to get primary keys from expr", zap.Error(err))return err}log.Debug("get primary keys from expr",zap.Int64("len of primary keys", numRow),zap.Int64("collectionID", dt.collectionID),zap.Int64("partationID", dt.partitionID))err = dt.produce(ctx, stream, primaryKeys)if err != nil {return err}return nil
}

函数getPrimaryKeysFromExpr()的返回schemapb.IDs。

type IDs struct {// Types that are valid to be assigned to IdField:////	*IDs_IntId//	*IDs_StrIdIdField              isIDs_IdFieldXXX_NoUnkeyedLiteral struct{}XXX_unrecognized     []byteXXX_sizecache        int32
}

isIDs_IdField是一个接口类型。

type isIDs_IdField interface {isIDs_IdField()
}

isIDs_IdField有2个实现:

  • IDs_IntId
  • IDs_StrId
type IDs_IntId struct {IntId *LongArray
}type LongArray struct {Data                 []int64XXX_NoUnkeyedLiteral struct{}XXX_unrecognized     []byteXXX_sizecache        int32
}type IDs_StrId struct {StrId *StringArray
}type StringArray struct {Data                 []stringXXX_NoUnkeyedLiteral struct{}XXX_unrecognized     []byteXXX_sizecache        int32
}

在这里插入图片描述

从expr提取主键存储到变量primaryKeys。

5.dt.produce()

func (dt *deleteTask) produce(ctx context.Context, stream msgstream.MsgStream, primaryKeys *schemapb.IDs) error {// 根据vchannels计算hashhashValues := typeutil.HashPK2Channels(primaryKeys, dt.vChannels)// repack delete msg by dmChannelresult := make(map[uint32]msgstream.TsMsg)numRows := int64(0)for index, key := range hashValues {vchannel := dt.vChannels[key]_, ok := result[key]if !ok {// 创建deleteMsgdeleteMsg, err := dt.newDeleteMsg(ctx)if err != nil {return err}deleteMsg.ShardName = vchannelresult[key] = deleteMsg}curMsg := result[key].(*msgstream.DeleteMsg)curMsg.HashValues = append(curMsg.HashValues, hashValues[index])curMsg.Timestamps = append(curMsg.Timestamps, dt.ts)typeutil.AppendIDs(curMsg.PrimaryKeys, primaryKeys, index)curMsg.NumRows++numRows++}// send delete request to log brokermsgPack := &msgstream.MsgPack{BeginTs: dt.BeginTs(),EndTs:   dt.EndTs(),}// 将deleteMsg包装进msgPackfor _, msg := range result {if msg != nil {msgPack.Msgs = append(msgPack.Msgs, msg)}}log.Debug("send delete request to virtual channels",zap.String("collectionName", dt.req.GetCollectionName()),zap.Int64("collectionID", dt.collectionID),zap.Strings("virtual_channels", dt.vChannels),zap.Int64("taskID", dt.ID()),zap.Duration("prepare duration", dt.tr.RecordSpan()))// 发送给mqerr := stream.Produce(msgPack)if err != nil {return err}dt.result.DeleteCnt += numRowsreturn nil
}

msgstream.TsMsg是一个接口类型。

有如下实现:

  • createCollectionMsg
  • CreateDatabaseMsg
  • CreateIndexMsg
  • createPartitionMsg
  • DataNodeTtMsg
  • DeleteMsg
  • DropCollectionMsg
  • DropDatabaseMsg
  • DropIndexMsg
  • DropPartitionMsg
  • FlushMsg
  • InsertMsg
  • LoadCollectionMsg
  • ReleaseCollectionMsg
  • TimeTickMsg

6.complexDelete

func (dt *deleteTask) complexDelete(ctx context.Context, plan *planpb.PlanNode, stream msgstream.MsgStream) error {err := dt.lb.Execute(ctx, CollectionWorkLoad{db:             dt.req.GetDbName(),collectionName: dt.req.GetCollectionName(),collectionID:   dt.collectionID,nq:             1,exec:           dt.getStreamingQueryAndDelteFunc(stream, plan),})if err != nil {log.Warn("fail to get or create dml stream", zap.Error(err))return err}return nil
}

最终会执行dt.getStreamingQueryAndDelteFunc。

这个函数会调用:

dt.produce(ctx, stream, result.GetIds())

simpleDelete也是调用这个函数。

complexDelete会根据expr查询出主键,然后根据主键进行删除数据。

7.总结

  • delete api根据expr走simpleDelete还是complexDelete。
  • complexDelete最终也会转化为simpleDelete。
http://www.lryc.cn/news/304516.html

相关文章:

  • CentOS使用Docker搭建Halo网站并实现无公网ip远程访问
  • 【JVM】垃圾回收算法
  • 如何和将原始request的Header中的值传递给openfeign请求的Header? 以及又如何获取openfeign请求中Header中的值
  • Flink 侧输出流(SideOutput)
  • C语言中关于#include的一些小知识
  • DSP芯片 机器码下载方法 【主要 “扯” 用Uniflash下载的方法】
  • 速盾网络:CDN用几天关了可以吗?安全吗?
  • MR混合现实情景实训教学系统在高空作业课堂中的应用
  • Windows系统中定时执行python脚本
  • HashMap 源码学习-jdk1.8
  • WebStorm 2023:让您更接近理想的开发环境 mac/win版
  • java面试题:数字与字母的映射表
  • Jmeter教程-JMeter 环境安装及配置
  • 十大基础排序算法
  • IP协议及相关技术协议
  • 小红书x-s算法及补环境 单旋转验证码
  • 代码检测规范和git提交规范
  • Elasticsearch:什么是搜索引擎?
  • 人工智能几个关键节点:深蓝,AlphaGo,ChatGPT,Sora
  • WordPres Bricks Builder 前台RCE漏洞复现(CVE-2024-25600)
  • 代码随想录算法训练营总结 | 慢慢总结,想起啥就先写上
  • 基于开源模型对文本和音频进行情感分析
  • SQL中为什么不要使用1=1
  • python 几种常见的音频数据读取、保存方式
  • 关于msvcr120.dll丢失怎样修复的详细解决步骤方法分享,msvcr120.dll文件的相关内容
  • 简单几步通过DD工具把云服务器系统Linux改为windows
  • 使用 package.json 配置代理解决 React 项目中的跨域请求问题
  • 生成 Let‘s Encrypt 免费证书
  • int128的实现(基本完成)
  • 【linux】使用 acme.sh 实现了 acme 协议生成免费的SSL 证书