milvus Delete API流程源码分析
Delete API执行流程源码解析
milvus版本:v2.3.2
整体架构:
Delete 的数据流向:
1.客户端sdk发出Delete API请求。
from pymilvus import (connections,Collection,
)print("start connecting to Milvus")
connections.connect("default", host="192.168.230.71", port="19530")hello_milvus = Collection("hello_milvus")print("Start delete entities")
## expr = "book_id in [0,1]" 主键
expr = "pk in [447868867306324066,447868867306324067]" ## 非主键
delete_result = hello_milvus.delete(expr)
print(delete_result)
2.服务端接受API请求,将request封装为deleteTask,并压入dmQueue队列。
注意这里是dmQueue。DDL类型的是ddQueue。
代码路径:internal\proxy\impl.go
// Delete delete records from collection, then these records cannot be searched.
func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) (*milvuspb.MutationResult, error) {......// request封装为deleteTaskdt := &deleteTask{ctx: ctx,Condition: NewTaskCondition(ctx),req: request,idAllocator: node.rowIDAllocator,chMgr: node.chMgr,chTicker: node.chTicker,lb: node.lbPolicy,}......// 将task压入dmQueue队列// MsgID will be set by Enqueue()if err := node.sched.dmQueue.Enqueue(dt); err != nil {......}......// 等待任务执行完if err := dt.WaitToFinish(); err != nil {......}......
}
DeleteRequest数据结构:
type DeleteRequest struct {Base *commonpb.MsgBaseDbName stringCollectionName stringPartitionName stringExpr stringHashKeys []uint32XXX_NoUnkeyedLiteral struct{}XXX_unrecognized []byteXXX_sizecache int32
}
3.执行deleteTask的3个方法PreExecute、Execute、PostExecute。
PreExecute()一般为参数校验等工作。
Execute()为真正执行逻辑。
PostExecute()执行完后的逻辑,什么都不做,返回nil。
代码路径:internal\proxy\task_delete.go
func (dt *deleteTask) Execute(ctx context.Context) (err error) {ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Delete-Execute")defer sp.End()log := log.Ctx(ctx)if len(dt.req.GetExpr()) == 0 {return merr.WrapErrParameterInvalid("valid expr", "empty expr", "invalid expression")}dt.tr = timerecord.NewTimeRecorder(fmt.Sprintf("proxy execute delete %d", dt.ID()))// 拿到stream,类型为msgstream.mqMsgStreamstream, err := dt.chMgr.getOrCreateDmlStream(dt.collectionID)if err != nil {return err}plan, err := planparserv2.CreateRetrievePlan(dt.schema, dt.req.Expr)if err != nil {return fmt.Errorf("failed to create expr plan, expr = %s", dt.req.GetExpr())}// 判断走simpleDelete还是complexDeleteisSimple, termExp := getExpr(plan)if isSimple {// if could get delete.primaryKeys from delete exprerr := dt.simpleDelete(ctx, termExp, stream)if err != nil {return err}} else {// if get complex delete expr// need query from querynode before deleteerr = dt.complexDelete(ctx, plan, stream)if err != nil {log.Warn("complex delete failed,but delete some data", zap.Int("count", dt.count), zap.String("expr", dt.req.GetExpr()))return err}}return nil
}
expr如果是主键表达式则走simpleDelete,否则走complexDelete。
4.simpleDelete
func (dt *deleteTask) simpleDelete(ctx context.Context, termExp *planpb.Expr_TermExpr, stream msgstream.MsgStream) error {primaryKeys, numRow, err := getPrimaryKeysFromExpr(dt.schema, termExp)if err != nil {log.Info("Failed to get primary keys from expr", zap.Error(err))return err}log.Debug("get primary keys from expr",zap.Int64("len of primary keys", numRow),zap.Int64("collectionID", dt.collectionID),zap.Int64("partationID", dt.partitionID))err = dt.produce(ctx, stream, primaryKeys)if err != nil {return err}return nil
}
函数getPrimaryKeysFromExpr()的返回schemapb.IDs。
type IDs struct {// Types that are valid to be assigned to IdField://// *IDs_IntId// *IDs_StrIdIdField isIDs_IdFieldXXX_NoUnkeyedLiteral struct{}XXX_unrecognized []byteXXX_sizecache int32
}
isIDs_IdField是一个接口类型。
type isIDs_IdField interface {isIDs_IdField()
}
isIDs_IdField有2个实现:
- IDs_IntId
- IDs_StrId
type IDs_IntId struct {IntId *LongArray
}type LongArray struct {Data []int64XXX_NoUnkeyedLiteral struct{}XXX_unrecognized []byteXXX_sizecache int32
}type IDs_StrId struct {StrId *StringArray
}type StringArray struct {Data []stringXXX_NoUnkeyedLiteral struct{}XXX_unrecognized []byteXXX_sizecache int32
}
从expr提取主键存储到变量primaryKeys。
5.dt.produce()
func (dt *deleteTask) produce(ctx context.Context, stream msgstream.MsgStream, primaryKeys *schemapb.IDs) error {// 根据vchannels计算hashhashValues := typeutil.HashPK2Channels(primaryKeys, dt.vChannels)// repack delete msg by dmChannelresult := make(map[uint32]msgstream.TsMsg)numRows := int64(0)for index, key := range hashValues {vchannel := dt.vChannels[key]_, ok := result[key]if !ok {// 创建deleteMsgdeleteMsg, err := dt.newDeleteMsg(ctx)if err != nil {return err}deleteMsg.ShardName = vchannelresult[key] = deleteMsg}curMsg := result[key].(*msgstream.DeleteMsg)curMsg.HashValues = append(curMsg.HashValues, hashValues[index])curMsg.Timestamps = append(curMsg.Timestamps, dt.ts)typeutil.AppendIDs(curMsg.PrimaryKeys, primaryKeys, index)curMsg.NumRows++numRows++}// send delete request to log brokermsgPack := &msgstream.MsgPack{BeginTs: dt.BeginTs(),EndTs: dt.EndTs(),}// 将deleteMsg包装进msgPackfor _, msg := range result {if msg != nil {msgPack.Msgs = append(msgPack.Msgs, msg)}}log.Debug("send delete request to virtual channels",zap.String("collectionName", dt.req.GetCollectionName()),zap.Int64("collectionID", dt.collectionID),zap.Strings("virtual_channels", dt.vChannels),zap.Int64("taskID", dt.ID()),zap.Duration("prepare duration", dt.tr.RecordSpan()))// 发送给mqerr := stream.Produce(msgPack)if err != nil {return err}dt.result.DeleteCnt += numRowsreturn nil
}
msgstream.TsMsg是一个接口类型。
有如下实现:
- createCollectionMsg
- CreateDatabaseMsg
- CreateIndexMsg
- createPartitionMsg
- DataNodeTtMsg
- DeleteMsg
- DropCollectionMsg
- DropDatabaseMsg
- DropIndexMsg
- DropPartitionMsg
- FlushMsg
- InsertMsg
- LoadCollectionMsg
- ReleaseCollectionMsg
- TimeTickMsg
6.complexDelete
func (dt *deleteTask) complexDelete(ctx context.Context, plan *planpb.PlanNode, stream msgstream.MsgStream) error {err := dt.lb.Execute(ctx, CollectionWorkLoad{db: dt.req.GetDbName(),collectionName: dt.req.GetCollectionName(),collectionID: dt.collectionID,nq: 1,exec: dt.getStreamingQueryAndDelteFunc(stream, plan),})if err != nil {log.Warn("fail to get or create dml stream", zap.Error(err))return err}return nil
}
最终会执行dt.getStreamingQueryAndDelteFunc。
这个函数会调用:
dt.produce(ctx, stream, result.GetIds())
simpleDelete也是调用这个函数。
complexDelete会根据expr查询出主键,然后根据主键进行删除数据。
7.总结
- delete api根据expr走simpleDelete还是complexDelete。
- complexDelete最终也会转化为simpleDelete。