当前位置: 首页 > news >正文

leveldb源码解析六——compact

compact分为manual_compaction、minor_compaction、major_compaction,统一由MaybeScheduleCompaction触发:

void DBImpl::MaybeScheduleCompaction() {mutex_.AssertHeld();if (background_compaction_scheduled_) {// Already scheduled} else if (shutting_down_.load(std::memory_order_acquire)) {// DB is being deleted; no more background compactions} else if (!bg_error_.ok()) {// Already got an error; no more changes} else if (imm_ == nullptr && manual_compaction_ == nullptr &&!versions_->NeedsCompaction()) {// No work to be done} else {background_compaction_scheduled_ = true;env_->Schedule(&DBImpl::BGWork, this);}
}

MaybeScheduleCompaction的调用时机有以下几种:
1、用户调用Get接口时,在查询过程中,可能会更新sstable的seek信息,触发compaction
2、用户在遍历db时,会更新sstable之间key重复的信息,触发compaction
3、用户调用Put接口时,当memtable写满之后,转化为imemtable,触发minor_compaction
4、每次做完compaction之后,会产生新的sstable,更新sstable相关信息,可能会再次出发compaction

minor_compaction

当有immetable时,就会触发minor_compaction:

void DBImpl::BackgroundCompaction() {...if (imm_ != nullptr) {CompactMemTable();return;}...
}
void DBImpl::CompactMemTable() {mutex_.AssertHeld();assert(imm_ != nullptr);// Save the contents of the memtable as a new TableVersionEdit edit;Version* base = versions_->current();base->Ref();// immetable转化为sstableStatus s = WriteLevel0Table(imm_, &edit, base);base->Unref();if (s.ok() && shutting_down_.load(std::memory_order_acquire)) {s = Status::IOError("Deleting DB during memtable compaction");}// Replace immutable memtable with the generated Tableif (s.ok()) {// 启用新的WALedit.SetPrevLogNumber(0);edit.SetLogNumber(logfile_number_);  // Earlier logs no longer neededs = versions_->LogAndApply(&edit, &mutex_);}if (s.ok()) {// Commit to the new state// 清理多余的文件imm_->Unref();imm_ = nullptr;has_imm_.store(false, std::memory_order_release);RemoveObsoleteFiles();} else {RecordBackgroundError(s);}
}

WriteLevel0Table通过遍历immetable,构建新的sstable,然后为新生产的sstable选择放置的level:
1、如果与level0的sstable有重叠,则放置在level0
2、如果level+1层sstable与该sstable key有重叠,那么放置在level层
3、如果level+2层sstable与sstable key的重叠范围大于阈值,那么放置在level层
1、2项是正确性问题,3项是为了防止后续再level层做compaction时,需要merge的KV对太多

major_compaction

major_compaction根据current_->compaction_score_、current_->file_to_compact_ 判断是否要进行,这两个值在下面情形中更新:
1、用户调用Get接口时,如果需要查询sstable,在查询到最高层level时,会记录下level的sstable,并且更新信息,每1MB数据允许查询1次,如果改sstable被seek的次数过多,就需要compaction,每1MB数据seek 1次时因为作者认为seek一次sstable的时间代价与compaction 1MB的时间代价相同
2、用户遍历DB时,会判断key重复的次数,超过阈值时,就会记录到current_->file_to_compact_中
3、每次compaction结束之后,会重新计算每层sstable是否需要再一次compaction,这个值记录在current_->file_to_compact_。
在compaction之前,需要选择compaction的sstable以及对应的level:对于current_->compaction_score_策略,需要知道对应的sstable,从记录上上次该level上结束的key开始,如果上次没有,则从最小的key开始找sstable,对于level0,由于key不重叠,需要找到所有与sstable重叠的sstable。
初步找到要compaction的sstable之后,需要尽可能的在该level或者level+1上找到与该sstable有重叠的sstable,一起做compaction:

Compaction* VersionSet::PickCompaction() {Compaction* c;int level;// We prefer compactions triggered by too much data in a level over// the compactions triggered by seeks.const bool size_compaction = (current_->compaction_score_ >= 1);const bool seek_compaction = (current_->file_to_compact_ != nullptr);if (size_compaction) {level = current_->compaction_level_;assert(level >= 0);assert(level + 1 < config::kNumLevels);c = new Compaction(options_, level);// Pick the first file that comes after compact_pointer_[level]for (size_t i = 0; i < current_->files_[level].size(); i++) {FileMetaData* f = current_->files_[level][i];if (compact_pointer_[level].empty() ||icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) {c->inputs_[0].push_back(f);break;}}if (c->inputs_[0].empty()) {// Wrap-around to the beginning of the key spacec->inputs_[0].push_back(current_->files_[level][0]);}} else if (seek_compaction) {level = current_->file_to_compact_level_;c = new Compaction(options_, level);c->inputs_[0].push_back(current_->file_to_compact_);} else {return nullptr;}c->input_version_ = current_;c->input_version_->Ref();// Files in level 0 may overlap each other, so pick up all overlapping onesif (level == 0) {InternalKey smallest, largest;GetRange(c->inputs_[0], &smallest, &largest);// Note that the next call will discard the file we placed in// c->inputs_[0] earlier and replace it with an overlapping set// which will include the picked file.current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]);assert(!c->inputs_[0].empty());}SetupOtherInputs(c);return c;
}

接下来就是做compaction,先选择compaction的最小seq,保证那些snapshot的key不会被compact掉,然后对于选中的sstable做归并排序,并且merge掉那些被delete的key、seq小的key:

//进行compact
Status DBImpl::DoCompactionWork(CompactionState* compact) {const uint64_t start_micros = env_->NowMicros();int64_t imm_micros = 0;  // Micros spent doing imm_ compactionsLog(options_.info_log,  "Compacting %d@%d + %d@%d files",compact->compaction->num_input_files(0),compact->compaction->level(),compact->compaction->num_input_files(1),compact->compaction->level() + 1);//进行一些前置条件保障assert(versions_->NumLevelFiles(compact->compaction->level()) > 0);assert(compact->builder == NULL);assert(compact->outfile == NULL);// 设置compact的最小版本号if (snapshots_.empty()) {compact->smallest_snapshot = versions_->LastSequence();} else {compact->smallest_snapshot = snapshots_.oldest()->number_;}// Release mutex while we're actually doing the compaction work//compact需要的信息都已经生成完毕,此时可以放开锁,从而主线程可以继续修改versionset和当前versionmutex_.Unlock();Iterator* input = versions_->MakeInputIterator(compact->compaction);input->SeekToFirst();Status status;ParsedInternalKey ikey;std::string current_user_key;bool has_current_user_key = false;SequenceNumber last_sequence_for_key = kMaxSequenceNumber;for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {// Prioritize immutable compaction work// 每一次for循环中都优先进行imemtable的compactif (has_imm_.NoBarrier_Load() != NULL) {const uint64_t imm_start = env_->NowMicros();mutex_.Lock();if (imm_ != NULL) {CompactMemTable();bg_cv_.SignalAll();  // Wakeup MakeRoomForWrite() if necessary}mutex_.Unlock();imm_micros += (env_->NowMicros() - imm_start);}Slice key = input->key();if (compact->compaction->ShouldStopBefore(key) &&compact->builder != NULL) {status = FinishCompactionOutputFile(compact, input);if (!status.ok()) {break;}}// Handle key/value, add to state, etc.bool drop = false;if (!ParseInternalKey(key, &ikey)) {// Do not hide error keyscurrent_user_key.clear();has_current_user_key = false;last_sequence_for_key = kMaxSequenceNumber;} else {if (!has_current_user_key ||user_comparator()->Compare(ikey.user_key,Slice(current_user_key)) != 0) {// First occurrence of this user keycurrent_user_key.assign(ikey.user_key.data(), ikey.user_key.size());has_current_user_key = true;last_sequence_for_key = kMaxSequenceNumber;}if (last_sequence_for_key <= compact->smallest_snapshot) {// Hidden by an newer entry for same user key// 这种情况下,说明last_sequence_for_key != kMaxSequenceNumber,即遍历过相同的key// (不满足上面一个if的条件),相同的key,如果小于最新的snapshot,即没有snapshot持有它// 则可以删除了drop = true;    // (A)} else if (ikey.type == kTypeDeletion &&ikey.sequence <= compact->smallest_snapshot &&compact->compaction->IsBaseLevelForKey(ikey.user_key)) {// For this user key:// (1) there is no data in higher levels (更高的level有数据,代表可能有更旧的数据,这里再删除会出错)// (2) data in lower levels will have larger sequence numbers// (3) data in layers that are being compacted here and have//     smaller sequence numbers will be dropped in the next//     few iterations of this loop (by rule (A) above).// Therefore this deletion marker is obsolete and can be dropped.drop = true;}last_sequence_for_key = ikey.sequence;}
#if 0Log(options_.info_log,"  Compact: %s, seq %d, type: %d %d, drop: %d, is_base: %d, ""%d smallest_snapshot: %d",ikey.user_key.ToString().c_str(),(int)ikey.sequence, ikey.type, kTypeValue, drop,compact->compaction->IsBaseLevelForKey(ikey.user_key),(int)last_sequence_for_key, (int)compact->smallest_snapshot);
#endifif (!drop) {// Open output file if necessaryif (compact->builder == NULL) {status = OpenCompactionOutputFile(compact);if (!status.ok()) {break;}}if (compact->builder->NumEntries() == 0) {compact->current_output()->smallest.DecodeFrom(key);}compact->current_output()->largest.DecodeFrom(key);compact->builder->Add(key, input->value());// Close output file if it is big enoughif (compact->builder->FileSize() >=compact->compaction->MaxOutputFileSize()) {status = FinishCompactionOutputFile(compact, input);if (!status.ok()) {break;}}}input->Next();}if (status.ok() && shutting_down_.Acquire_Load()) {status = Status::IOError("Deleting DB during compaction");}if (status.ok() && compact->builder != NULL) {status = FinishCompactionOutputFile(compact, input);}if (status.ok()) {status = input->status();}delete input;input = NULL;// 统计本次compaction的读写量CompactionStats stats;stats.micros = env_->NowMicros() - start_micros - imm_micros;for (int which = 0; which < 2; which++) {for (int i = 0; i < compact->compaction->num_input_files(which); i++) {stats.bytes_read += compact->compaction->input(which, i)->file_size;}}for (size_t i = 0; i < compact->outputs.size(); i++) {stats.bytes_written += compact->outputs[i].file_size;}mutex_.Lock();stats_[compact->compaction->level() + 1].Add(stats);if (status.ok()) {status = InstallCompactionResults(compact);}if (!status.ok()) {RecordBackgroundError(status);}VersionSet::LevelSummaryStorage tmp;Log(options_.info_log,"compacted to: %s", versions_->LevelSummary(&tmp));return status;
}

manual_compaction

manul_compaction通过:

  // Compact the underlying storage for the key range [*begin,*end].// In particular, deleted and overwritten versions are discarded,// and the data is rearranged to reduce the cost of operations// needed to access the data.  This operation should typically only// be invoked by users who understand the underlying implementation.//// begin==nullptr is treated as a key before all keys in the database.// end==nullptr is treated as a key after all keys in the database.// Therefore the following call will compact the entire database://    db->CompactRange(nullptr, nullptr);virtual void CompactRange(const Slice* begin, const Slice* end) = 0;

接口来指定compaction的key范围,从这个范围中回去对应key的sstable,然后做compaction

http://www.lryc.cn/news/3475.html

相关文章:

  • 数据结构(二):单向链表、双向链表
  • COCO物体检测评测方法简介
  • 记一次上环境获取资源失败的案例
  • 实战超详细MySQL8离线安装
  • 依赖倒置原则|SOLID as a rock
  • Webpack的知识要点
  • handler解析(2) -Handler源码解析
  • 【算法】kmp
  • git 常用命令之 git checkout
  • 一些常见错误
  • [单片机框架][调试功能] 回溯案发现场
  • MySQL主从同步-(二)搭建从机服务器
  • Linux系列 备份与分享文档
  • SNI生效条件 - 补充nginx-host绕过实例复现中SNI绕过的先决条件
  • 傻白探索Chiplet,Modular Routing Design for Chiplet-based Systems(十一)
  • C语言静态库、动态库的封装和注意事项
  • MyBatis-Plus分页插件和MyBatisX插件
  • 年前无情被裁,面试大厂的这几个月…
  • 基于Java的分片上传功能
  • KDS安装步骤
  • JavaSE-线程池(1)- 线程池概念
  • 开源代码的寿命为何只有1年?
  • 完善登录功能--过滤器的使用
  • CSS基础:属性和关系选择器
  • 设计模式:原型模式解决对象创建成本大问题
  • 驱动开发(二)
  • 《狂飙》大结局,这22句经典台词值得细品
  • 【计算机网络期末复习】第二章 物理层
  • 多核异构核间通信-mailbox/RPMsg 介绍及实验
  • 【Rust日报】2023-02-11 从头开始构建云数据库 RisingWave - 为什么我们从 C++ 转向 Rust...