leveldb的Compaction线程
个人随笔 (Owed by: 春夜喜雨 http://blog.csdn.net/chunyexiyu)
1. leveldb的Compaction全局线程
在leveldb中,有一个全局的后台线程BGThread,用于数据库的MinorCompact与MajorCompact。
重点关注“全局线程”:
这个标识着无论一个进程打开多少个leveldb库,该Compact线程只有一个;
如果在一个程序中,打开了许多库在读写,多个库都触发了MinorCompact,那么这些MinorCompact将在Compact线程中依次执行;
也因为是全局线程,所以需要触发minorcompact的db线程,把db的this指针传递给Compact线程中来。
基于此,来看一下代码上的实现:
2. 任务Schedule部分,建立了一个创建一次的全局子线程
通过started_backgroud_thread_来标识是否已创建;
定义了一个std::thread backgroud_thread,并detach掉,形成一个全局的子线程;
把任务函数与参数放入到backgroud_work_queue_中;
void WindowsEnv::Schedule(void (*background_work_function)(void* background_work_arg),void* background_work_arg) {background_work_mutex_.Lock();// Start the background thread, if we haven't done so already.if (!started_background_thread_) {started_background_thread_ = true;std::thread background_thread(WindowsEnv::BackgroundThreadEntryPoint, this);background_thread.detach();}// If the queue is empty, the background thread may be waiting for work.if (background_work_queue_.empty()) {background_work_cv_.Signal();}background_work_queue_.emplace(background_work_function, background_work_arg);background_work_mutex_.Unlock();
}
3. 任务线程执行部分,不断的从队列中取任务函数与参数,并执行任务函数
从backgroud_work_queue_中取出任务函数与参数,执行函数;
其中为了避免queue中一直没有数据一直取的情况,使用了一个backgroupd_work_cv来在为空时等待;
static void BackgroundThreadEntryPoint(WindowsEnv* env) {env->BackgroundThreadMain();
}
void WindowsEnv::BackgroundThreadMain() {while (true) {background_work_mutex_.Lock();// Wait until there is work to be done.while (background_work_queue_.empty()) {background_work_cv_.Wait();}assert(!background_work_queue_.empty());auto background_work_function = background_work_queue_.front().function;void* background_work_arg = background_work_queue_.front().arg;background_work_queue_.pop();background_work_mutex_.Unlock();background_work_function(background_work_arg);}
}
4. 在需要触发Compact的地方,传入任务函数BGWork与当前库的指针
调用MaybeScheduleCompaction,在其中判定一个需要Compact的场景,排除掉一些不需要发起Compact的场景:
- 已经在执行Compact的场景除外;
- 已经关闭完毕的场景除外;
- 该数据库后台已经有问题的场景除外;
- 没有需要Compact的场景除外:没有待执行的Minorcompact+也没有待执行的手动Compaction+没有待执行的size-compaction/file-compaction;
void DBImpl::MaybeScheduleCompaction() {mutex_.AssertHeld();if (background_compaction_scheduled_) {// Already scheduled}else if (shutting_down_.load(std::memory_order_acquire)) {// DB is being deleted; no more background compactions}else if (!bg_error_.ok()) {// Already got an error; no more changes}else if (imm_ == nullptr && manual_compaction_ == nullptr &&!versions_->NeedsCompaction()) {// No work to be done}else {background_compaction_scheduled_ = true;env_->Schedule(&DBImpl::BGWork, this);}
}
4. 任务函数BGWork又做了什么呢?
BGWork是一层包装,直接调用数据db参数的方法BackgroundCall;
- 里面又检查了一次数据库已经关闭的场景和数据库任务出错情况,因为从任务排到队列中,到排到任务执行,可能是要等一段时间的,在这个过程中,也可能条件改变了。
- 接着执行BackgroundCompaction来做该db的Compaction;
- 做完Compaction之后,把background_compaction_scheduled设定为false,以允许MaybeScheduleCompaction再次进入;
- 接着就调用MaybeScheduleCompaction函数,再检查一边这个数据有没有待compaction内容未做完,例如这一次只做了MinorCompact,还有MajorCompact要做,再次去排队等执行;
- 然后再来标识库的db后台任务项当前执行完了,发送一个信号background_work_finished_signal_;
background_work_finished_signal_这个信号量通常用来通知写线程或关闭线程,通知该db的后台一项任务做完了,通知等待线程可以继续向后执行了。
void DBImpl::BGWork(void* db) {reinterpret_cast<DBImpl*>(db)->BackgroundCall();
}void DBImpl::BackgroundCall() {MutexLock l(&mutex_);assert(background_compaction_scheduled_);if (shutting_down_.load(std::memory_order_acquire)) {// No more background work when shutting down.}else if (!bg_error_.ok()) {// No more background work after a background error.}else {BackgroundCompaction();}background_compaction_scheduled_ = false;// Previous compaction may have produced too many files in a level,// so reschedule another compaction if needed.MaybeScheduleCompaction();background_work_finished_signal_.SignalAll();
}
个人随笔 (Owed by: 春夜喜雨 http://blog.csdn.net/chunyexiyu)