当前位置: 首页 > news >正文

HBase的异步WAL性能优化:RingBuffer的奥秘

AbstractFSWAL与Ring Buffer

在 HBase 中,预写日志(Write-Ahead Log, WAL)是保证数据持久性和一致性的核心组件。每一次数据写入(Put/Delete)都必须先成功写入 WAL,然后才能更新内存中的 MemStore。这个特性决定了 WAL 的写入性能直接关系到整个 HBase 集群的写入吞吐量和延迟,因此对其性能优化至关重要。

AbstractFSWAL 采用基于 LMAX Disruptor 框架的 RingBuffer 来构建一个高性能的生产者-消费者模型,正是为了应对这一挑战。

为什么要使用 RingBuffer?

在 AbstractFSWAL 的设计中,存在一个典型的“多生产者,单消费者”场景:

  • 多生产者 (Producers):HBase RegionServer 中处理客户端写入请求的 RPC Handler 线程。每个线程在处理写入操作时,都会产生一条 WAL 日志条目(FSWALEntry),需要被写入日志文件。
  • 单消费者 (Consumer):一个专门的后台线程,负责将这些日志条目批量地、持久化地写入底层文件系统(如 HDFS)。

要解耦生产者和消费者,最直接的想法是使用一个线程安全的队列,比如 Java 并发包中的 BlockingQueue。但是,在高并发写入场景下,BlockingQueue 的弊端会很明显:

  • 锁竞争BlockingQueue 的实现依赖于锁(ReentrantLock)。当大量 RPC 线程(生产者)同时尝试向队列中放入数据时,会产生激烈的锁竞争,导致线程阻塞和上下文切换,严重影响性能。
  • GC 压力:每次写入都需要创建一个新的日志条目对象放入队列,这会给垃圾回收(GC)带来压力,可能引发 GC 停顿(GC Pause),导致写入延迟抖动。

为了解决这些问题,HBase 引入了 RingBufferRingBuffer 是 LMAX Disruptor 框架的核心,它是一个为极致性能设计的、基于环形数组的无锁(Lock-Free)数据结构。它通过 CAS(Compare-And-Swap)等原子操作来协调生产者和消费者,避免了使用锁,从而根除了锁竞争带来的性能瓶颈。

RingBuffer 原理和源码分析见:揭秘Disruptor RingBuffer高性能核心机制

总而言之,选择 RingBuffer 的根本原因是为了在 WAL 写入这个关键路径上,实现超高的吞吐量和极低的、可预测的延迟。

RingBuffer 是如何被使用的?

在 AbstractFSWAL.java 中,RingBuffer 的使用可以从以下几个方面来理解:

a. 初始化

在 AbstractFSWAL 的构造函数中,RingBuffer 被创建和初始化:

// ... existing code ...int preallocatedEventCount =conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16);waitingConsumePayloads =RingBuffer.createMultiProducer(RingBufferTruck::new, preallocatedEventCount);waitingConsumePayloadsGatingSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);waitingConsumePayloads.addGatingSequences(waitingConsumePayloadsGatingSequence);
// ... existing code ...
  • waitingConsumePayloads: 这就是我们关注的 RingBuffer 实例。
  • createMultiProducer: 表明这是一个支持多个生产者线程并发写入的 RingBuffer,这与 HBase 的 RPC 模型完全匹配。
  • RingBufferTruck::new: 这是一个事件工厂(Event Factory)。RingBuffer 在初始化时,会使用这个工厂预先创建并填满整个环形数组。这里的“事件”就是 RingBufferTruck 对象。这样做是为了后续操作中可以复用这些对象,而不是每次都创建新的,从而避免了 GC 开销。
  • preallocatedEventCount: RingBuffer 的大小,可以通过配置 hbase.regionserver.wal.disruptor.event.count 调整。
b. 事件载体:RingBufferTruck

RingBuffer 中传递的不是原始的日志数据,而是包装过的事件对象 RingBufferTruck

RingBufferTruck.java

@InterfaceAudience.Private
final class RingBufferTruck {public enum Type {APPEND,SYNC,EMPTY}private Type type = Type.EMPTY;/*** Either this syncFuture is set or entry is set, but not both.*/private SyncFuture sync;private FSWALEntry entry;// ... 方法 ...
}

RingBufferTruck 像一辆“卡车”,负责在生产者和消费者之间运送“货物”。

  • 它可以装载两种类型的货物:FSWALEntry(代表一次追加写入操作)或 SyncFuture(代表一次同步刷盘请求)。
  • 通过 Type 枚举来区分当前装载的是哪种货物。
  • 这种设计使得数据写入和刷盘命令可以通过同一个 RingBuffer 按顺序处理,保证了操作的序列化。
c. 生产者逻辑(写入和同步)

当一个 RPC 线程需要写入一条 WAL 日志时,它会执行以下步骤(这是一个标准的 Disruptor 生产者流程):

  1. 申请槽位:调用 waitingConsumePayloads.next() 从 RingBuffer 中申请一个可用的槽位(sequence)。这个操作是无锁的,通过 CAS 实现。
  2. 获取事件对象:调用 waitingConsumePayloads.get(sequence) 获取该槽位上预创建好的 RingBufferTruck 对象。
  3. 装载数据:调用 truck.load(fswalEntry) 将日志条目装载到这辆“卡车”中。
  4. 发布事件:调用 waitingConsumePayloads.publish(sequence),通知消费者这个槽位的数据已经准备就绪。这个操作同样是无锁的。

整个过程,生产者线程完全不涉及磁盘 I/O,也几乎没有锁等待,可以非常快地完成并将控制权交还,从而快速响应客户端。

d. 消费者逻辑

consume() 负责从 RingBuffer 中取出数据,进行批量的写入和同步操作。整个方法的执行由一个专门的 consumeExecutor 线程池来调度,并且设计为单线程执行,以保证 WAL 写入的顺序性。

AbstractFSWAL 中定义了一个消费者任务,由一个专用的线程池 consumeExecutor 执行。

// ... existing code ...protected final Runnable consumer = this::consume;
// ... existing code ...protected final void createSingleThreadPoolConsumeExecutor(String walType, final Path rootDir,final String prefix) {ThreadPoolExecutor threadPool =new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),new ThreadFactoryBuilder().setNameFormat(walType + "-%d-" + rootDir.toString() + "-prefix:"+ (prefix == null ? "default" : prefix).replace("%", "%%")).setDaemon(true).build());consumeExecutor = threadPool;

consume() 方法的执行可以分为以下几个主要步骤:

1. 前置状态检查与锁保护
// ... existing code ...private void consume() {consumeLock.lock();try {int currentEpochAndState = epochAndState;if (writerBroken(currentEpochAndState)) {return;}if (waitingRoll(currentEpochAndState)) {if (writer.getLength() > fileLengthAtLastSync) {// issue a syncsync(writer);} else {if (unackedAppends.isEmpty()) {readyForRolling = true;readyForRollingCond.signalAll();}}return;}} finally {consumeLock.unlock();}
// ... existing code ...
  • consumeLock.lock(): 方法开始时会获取 consumeLock。这个锁非常关键,它确保了任何时刻只有一个线程在执行消费逻辑,防止了对 toWriteAppendsunackedAppends 等共享状态的并发修改。
  • 状态检查 (writerBrokenwaitingRoll):
    • writerBroken: 检查当前的 WAL writer 是否已经损坏。如果已损坏,消费者将直接返回,不再处理任何数据,等待 rollWriter 流程来替换一个新的 writer。
    • waitingRoll: 检查是否正在等待日志滚动(roll)。如果正在等待,说明已经有 rollWriter 的请求,消费者此时的行为会改变:
      • 如果还有未同步的数据(writer.getLength() > fileLengthAtLastSync),则触发最后一次 sync(),确保当前 writer 的所有数据都落盘。
      • 如果没有未同步数据,并且所有已写入的数据都收到了确认(unackedAppends.isEmpty()),则将 readyForRolling 标志位置为 true 并唤醒可能在等待的 rollWriter 线程,告知它可以安全地进行下一步操作了。
      • 处理完后直接 return,不再消费新的数据,因为旧的 writer 即将被关闭。
2. 从 RingBuffer 消费事件
// ... existing code ...long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1;for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor<= cursorBound; nextCursor++) {if (!waitingConsumePayloads.isPublished(nextCursor)) {break;}RingBufferTruck truck = waitingConsumePayloads.get(nextCursor);switch (truck.type()) {case APPEND:toWriteAppends.addLast(truck.unloadAppend());break;case SYNC:syncFutures.add(truck.unloadSync());break;default:LOG.warn("RingBufferTruck with unexpected type: " + truck.type());break;}waitingConsumePayloadsGatingSequence.set(nextCursor);}
// ... existing code ...

这是 consume() 方法的核心循环,负责从 RingBuffer 中批量拉取事件:

  • 它首先获取消费者当前的进度 waitingConsumePayloadsGatingSequence 和生产者最新的进度 waitingConsumePayloads.getCursor()
  • 然后在一个 for 循环中,遍历两者之间的所有事件。
  • waitingConsumePayloads.isPublished(nextCursor) 确保只处理已经被生产者发布(publish)的事件。
  • waitingConsumePayloads.get(nextCursor) 获取事件载体 RingBufferTruck
  • 通过 switch (truck.type()) 判断事件类型:
    • APPEND: 如果是写入请求,就调用 truck.unloadAppend() 取出 FSWALEntry,并将其添加到 toWriteAppends 队列中。这是一个内存中的批处理队列,等待后续被写入文件。
    • SYNC: 如果是同步请求,就调用 truck.unloadSync() 取出 SyncFuture,并将其添加到 syncFutures 这个有序集合中。
  • waitingConsumePayloadsGatingSequence.set(nextCursor): 每处理完一个事件,就更新消费者的进度,告知 RingBuffer 这个槽位可以被生产者重新使用了。

这个循环体现了 RingBuffer 批量处理的优势,一次 consume() 调用可以处理多个事件,非常高效。

3. 写入与同步
// ... existing code ...try {appendAndSync();} catch (IOException exception) {
// ... existing code ...LOG.error("appendAndSync throws IOException.", exception);onAppendEntryFailed(exception);return;}
// ... existing code ...
  • appendAndSync(): 在从 RingBuffer 中取出所有可用事件后,调用 appendAndSync() 方法。这个方法负责将 toWriteAppends 队列中的日志条目(FSWALEntry)实际写入到底层文件系统,并根据条件(如数据量达到 batchSize)触发 sync 操作。
  • 异常处理: 如果 appendAndSync() 抛出 IOException(例如,在 FSHLog 的同步写入实现中),会调用 onAppendEntryFailed() 来处理这个异常,这通常会导致将当前 writer 标记为 broken 并请求一次日志滚动。
4. 消费者任务的重新调度
// ... existing code ...if (hasConsumerTask.get()) {return;}if (toWriteAppends.isEmpty()) {if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) {consumerScheduled.set(false);// recheck here since in append and sync we do not hold the consumeLock. Thing may// happen like// 1. we check cursor, no new entry// 2. someone publishes a new entry to ringbuffer and the consumerScheduled is true and// give up scheduling the consumer task.// 3. we set consumerScheduled to false and also give up scheduling consumer task.if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) {// we will give up consuming so if there are some unsynced data we need to issue a sync.if (writer.getLength() > fileLengthAtLastSync && !syncFutures.isEmpty()&& syncFutures.last().getTxid() > highestProcessedAppendTxidAtLastSync) {// no new data in the ringbuffer and we have at least one sync requestsync(writer);}return;} else {// maybe someone has grabbed this before usif (!consumerScheduled.compareAndSet(false, true)) {return;}}}}// reschedule if we still have something to write.consumeExecutor.execute(consumer);}
// ... existing code ...

这是 consume() 方法的收尾部分,逻辑非常精巧,目的是决定是否需要再次向 consumeExecutor 提交一个新的 consumer 任务来处理后续的数据。

  • if (hasConsumerTask.get()): 检查执行队列里是否已经有了一个待执行的 consumer 任务。如果有,就直接返回,避免重复调度。
  • if (toWriteAppends.isEmpty()): 如果本地的写入队列空了,说明这一批数据处理完了。
    • 双重检查: 此时会进行一次非常关键的双重检查 waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()。这是为了处理一个竞态条件:在当前线程检查 RingBuffer 为空到它决定不再调度自己之间,可能有新的生产者发布了事件。通过双重检查和 consumerScheduled 这个 AtomicBoolean 标志位,可以确保不会漏掉任何事件。
    • 最后的 sync: 如果确定 RingBuffer 中没有新数据,并且当前线程即将“休眠”,它会检查是否还有未同步的数据(writer.getLength() > fileLengthAtLastSync)和待处理的 sync 请求。如果有,就触发最后一次 sync,确保所有已写入的数据都得到持久化。
  • consumeExecutor.execute(consumer): 如果检查后发现 toWriteAppends 队列中仍有数据,或者在双重检查中发现 RingBuffer 中有了新数据,就会再次向 consumeExecutor 提交 consumer 任务,形成一个自我驱动的循环。

使用 RingBuffer 的优势是什么?

总结起来,AbstractFSWAL 使用 RingBuffer 带来了以下核心优势:

  1. 极致的性能:通过无锁化设计,彻底消除了多生产者之间的锁竞争,极大地提升了 WAL 的写入吞吐量,降低了 P99 等高百分位延迟。
  2. 避免 GC 抖动RingBuffer 预分配事件对象(RingBufferTruck)并循环使用它们的模式,避免了在关键路径上创建大量小对象,显著降低了 GC 压力和由 GC 引发的服务停顿,使得系统延迟更加平稳和可预测。
  3. 高效的批量处理:消费者的模型天然支持批量处理。将多次小的写入请求在内存中聚合成一次大的写入,能更充分地利用磁盘 I/O 带宽,这对于像 HDFS 这样的文件系统尤其高效。
  4. 清晰的职责分离:生产者(RPC 线程)只管快速生成数据并放入缓冲区,不关心 I/O 细节。消费者(WAL 写入线程)则专注于如何高效、可靠地将数据持久化。这种清晰的分离使得代码逻辑更简单,也更容易进行针对性的优化。
  5. 利用“机械同情”:Disruptor 的设计充分考虑了现代 CPU 的缓存架构。例如,它通过缓存行填充(Cache Line Padding)来避免伪共享(False Sharing),进一步压榨硬件性能。

综上所述,HBase 在其最核心的 WAL 模块中使用 RingBuffer,是一个精心设计的、旨在追求极致性能的典范。它通过无锁并发、对象复用和高效批处理等技术,确保了 HBase 在高并发写入场景下的稳定性和高性能。

总结

consume() 方法是 AbstractFSWAL 中生产者-消费者模型的核心实现。它通过单线程消费、批量处理、精巧的状态管理和任务调度机制,实现了以下目标:

  1. 顺序保证: 通过 consumeLock 和单线程执行器,保证了所有 WAL 事件被顺序处理和写入。
  2. 高性能: 从 RingBuffer 批量拉取事件,在内存中聚合后批量写入磁盘,最大化了 I/O 效率。
  3. 鲁棒性: 能够正确处理 writer 损坏、日志滚动等异常情况,保证数据一致性。
  4. 高效调度: 通过自我驱动的调度机制,只在有工作可做时才保持活跃,没有数据时则自动“休眠”,避免了不必要的 CPU 消耗。

这个方法的设计充分体现了高性能系统编程中的常见模式和技巧,是理解 HBase 写入路径性能关键的入口。

AbstractFSWAL

AbstractFSWAL 是 HBase 中预写日志(WAL)功能的一个关键基石。它实现了 WAL 接口,为所有 基于文件系统(如 HDFS) 的 WAL 提供了一个通用的、功能完备的骨架。具体的 WAL 实现(如 FSHLog 和 AsyncFSWAL)都继承自这个类。

@InterfaceAudience.Private
public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
// ...

  • public abstract class: 这表明 AbstractFSWAL 是一个抽象类,不能被直接实例化。它定义了 WAL 的通用行为和算法流程,但将一些具体的实现细节延迟到子类中。这是一种典型的模板方法设计模式
  • <W extends WriterBase>: 这是一个泛型参数。W 代表了真正执行写入操作的“写入器”(Writer)的类型。WriterBase 是一个接口,约束了写入器必须具备的能力(如 appendsyncgetLength)。这种设计使得 AbstractFSWAL 可以与不同类型的写入器(例如,同步的、异步的)解耦,提供了极大的灵活性。
  • implements WAL: 这表明该类遵循了 WAL 接口定义的契约,实现了 appendsyncrollWriter 等所有核心方法,为上层(HRegion)提供了标准的预写日志服务。

核心职责与设计思想

AbstractFSWAL 的核心职责是管理 WAL 文件的整个生命周期,并提供一个高性能、高可靠的日志追加和持久化机制。其设计思想贯穿了以下几个方面:

  • 高性能异步化:通过 LMAX Disruptor 的 RingBuffer 实现生产者-消费者模型,将数据写入请求与实际的磁盘 I/O 解耦,实现批量、异步写入,最大化吞吐量。
  • 可靠性保证:通过 sync 机制强制将数据刷写到 HDFS 并等待确认,确保数据持久不丢失。
  • 生命周期管理:自动化地处理日志文件的创建、滚动(Rolling)、归档和清理。
  • 高度可配置性:提供了大量的配置参数,允许管理员根据硬件和负载特性进行深度调优。

我们可以将这个庞大的类拆解为几个逻辑部分来理解。

配置项与常量

AbstractFSWAL 定义了大量 static final String 类型的常量,这些都是可以在 hbase-site.xml 中配置的参数。

// ... existing code ...protected static final String SLOW_SYNC_TIME_MS = "hbase.regionserver.wal.slowsync.ms";public static final String WAL_SYNC_TIMEOUT_MS = "hbase.regionserver.wal.sync.timeout";public static final String WAL_ROLL_MULTIPLIER = "hbase.regionserver.logroll.multiplier";public static final String MAX_LOGS = "hbase.regionserver.maxlogs";public static final String RING_BUFFER_SLOT_COUNT ="hbase.regionserver.wal.disruptor.event.count";public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size";
// ... existing code ...
  • 性能与行为控制: 如 WAL_BATCH_SIZE (WAL 批量大小)、RING_BUFFER_SLOT_COUNT (内部 RingBuffer 的大小)。
  • 日志滚动策略: 如 MAX_LOGS (最大 WAL 文件数)、hbase.regionserver.logroll.size (单个 WAL 文件大小,通过 logrollsize 字段使用)。
  • 超时与健康监测: 如 SLOW_SYNC_TIME_MS (慢同步阈值)、WAL_SYNC_TIMEOUT_MS (同步超时时间)。

这些配置项使得 AbstractFSWAL 的行为具备极高的可塑性。

核心状态字段

这些字段维护了 WAL 运行时的核心状态。

// ... existing code ...protected final FileSystem fs;protected final Path walDir;protected final Path walArchiveDir;protected final ReentrantLock rollWriterLock = new ReentrantLock(true);protected final SequenceIdAccounting sequenceIdAccounting = new SequenceIdAccounting();protected final ConcurrentNavigableMap<Path, WALProps> walFile2Props =new ConcurrentSkipListMap<>(LOG_NAME_COMPARATOR);volatile W writer;protected final AtomicLong highestSyncedTxid = new AtomicLong(0);protected volatile long highestUnsyncedTxid = -1;
// ... existing code ...
  • fswalDirwalArchiveDir: 分别代表了文件系统实例、当前 WAL 文件目录和归档目录的路径。
  • writer核心字段,指向当前正在写入的日志文件写入器。volatile 关键字确保了其在多线程间的可见性,尤其是在日志滚动切换写入器时。
  • rollWriterLock: 一个可重入锁,用于确保任何时候只有一个线程在执行日志滚动(rollWriter)操作,防止并发操作导致状态混乱。
  • sequenceIdAccounting: 一个非常重要的辅助类,负责记录每个 Region 已经刷写到 HFile 的最大 SequenceId。这是判断一个旧 WAL 文件是否可以被安全归档的核心依据。
  • walFile2Props: 一个并发跳表,存储了所有“活跃”(未归档)的 WAL 文件路径及其属性(WALProps),并根据文件名中的时间戳排序。
  • highestSyncedTxid / highestUnsyncedTxid: 这两个原子变量分别追踪已成功持久化的最大事务ID和已提交到缓冲区但尚未持久化的最大事务ID,是 sync 机制的核心状态。

高性能写入核心:Disruptor 与生产者-消费者模型

这是 AbstractFSWAL 实现高性能的关键。

// ... existing code ...private final RingBuffer<RingBufferTruck> waitingConsumePayloads;private final Sequence waitingConsumePayloadsGatingSequence;protected final Runnable consumer = this::consume;private final Lock consumeLock = new ReentrantLock();protected final Deque<FSWALEntry> toWriteAppends = new ArrayDeque<>();protected final Deque<FSWALEntry> unackedAppends = new ArrayDeque<>();protected final SortedSet<SyncFuture> syncFutures = new TreeSet<>(SEQ_COMPARATOR);
// ... existing code ...
  • 生产者: 处理客户端请求的 RPC 线程。当调用 append() 时,它会从 RingBuffer 中获取一个“卡车”(RingBufferTruck),将 WALEdit 装载进去,然后发布。这个过程非常快。
  • 消费者: 一个由 consumeExecutor 运行的后台线程,其执行逻辑在 consumer (this::consume) 中。
  • 缓冲区waitingConsumePayloads,即 RingBuffer
  • 流程:
    1. 生产者(RPC线程)调用 append,将数据放入 RingBuffer,并获得一个 SyncFuture 对象后等待。
    2. 消费者线程 (consumer) 在后台循环,批量地从 RingBuffer 中取出数据(FSWALEntry),放入 toWriteAppends 队列。
    3. 消费者线程调用 doAppend()(由子类实现)将 toWriteAppends 队列中的数据写入 HDFS 的客户端缓冲区。
    4. 当 sync() 被调用时,消费者线程会调用 writer.sync(),将 HDFS 客户端缓冲区的数据强制刷到 DataNode。
    5. sync 成功后,消费者线程会完成(complete)对应的 SyncFuture,唤醒等待的生产者线程。

日志滚动(Log Rolling)与清理(Cleanup)

这是 WAL 生命周期管理的核心。

// ... existing code ...@Overridepublic Map<byte[], List<byte[]>> rollWriter(boolean force) throws IOException {return TraceUtil.trace(() -> rollWriterInternal(force), () -> createSpan("WAL.rollWriter"));}private Map<byte[], List<byte[]>> rollWriterInternal(boolean force) throws IOException {rollWriterLock.lock();try {
// ... existing code ...} finally {rollWriterLock.unlock();}}protected void cleanOldLogs() {
// ...}
// ... existing code ...
  • rollWriter(): 对外暴露的接口,它会调用被 rollWriterLock 锁保护的 rollWriterInternal
  • 滚动触发条件: 文件大小达到阈值 (logrollsize)、强制执行 (force=true)、写入器出现错误、或检测到长时间的慢同步。
  • 滚动过程:
    1. 获取 rollWriterLock 锁。
    2. 创建一个新的日志文件和对应的 writer 实例。
    3. 将旧的 writer 关闭(可能是异步的)。
    4. 更新 writer 引用,使其指向新的写入器。
    5. 释放锁。
  • cleanOldLogs(): 负责归档旧的 WAL 文件。它会遍历 walFile2Props 中记录的 WAL 文件,并结合 sequenceIdAccounting 的信息,判断一个文件是否可以被安全地移动到归档目录 (walArchiveDir)。

抽象方法与子类扩展

AbstractFSWAL 定义了骨架,但将与具体写入技术相关的部分抽象出去。

// ... existing code ...protected abstract W createWriterInstance(FileSystem fs, Path path)throws IOException, CommonFSUtils.StreamLacksCapabilityException;protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException;protected abstract void doSync(W writer, long txid) throws IOException;
// ... existing code ...
  • createWriterInstance()工厂方法。子类必须实现它来创建自己特定类型的写入器 W
  • doAppend() / doSync(): 子类必须实现这两个方法,定义如何将一个 FSWALEntry 真正地写入流,以及如何执行同步操作。

例如,FSHLog 会创建同步的 ProtobufLogWriter,而 AsyncFSWAL 则会创建异步的 AsyncProtobufLogWriter,它们的 doAppend 和 doSync 实现也因此不同。

append

这个方法是 AbstractFSWAL(文件系统基础的 WAL 实现的抽象基类)中实现数据追加(append)的核心入口。无论是用户数据的写入(appendData)还是系统元数据标记的写入(appendMarker),最终都会调用到这个方法。它的主要作用是将一个写操作(封装在 WALEdit 中)发布到内部的处理队列(一个 RingBuffer),并触发后续的异步处理流程。

protected long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore)throws IOException
  • protected long append(...): 这是一个受保护的方法,意味着它可以被 AbstractFSWAL 自身以及其子类(如 FSHLog 和 AsyncFSWAL)调用。它返回一个 long 类型的事务 ID (txid),这个 ID 在 WAL 内部是唯一的,用于跟踪和同步。
  • RegionInfo hri: 写入操作所属的 Region 的信息。
  • WALKeyImpl key: WAL 条目的键。它包含了 Region 名称、表名、写入时间戳、MVCC(多版本并发控制)信息等。这个对象在方法调用过程中会被修改,填入分配到的序列号(Sequence ID)。
  • WALEdit edits: 包含了实际要写入的数据,即一个或多个 Cell。对于标记性操作,它可能不包含实际的 Cell 数据。
  • boolean inMemstore: 一个非常重要的标志。
    • true: 表示这是一个常规的数据写入操作,数据最终会进入 MemStore。
    • false: 表示这是一个“标记性”(Marker)的元数据操作,例如记录 Compaction 完成、Region 打开/关闭等事件。这些标记只写入 WAL 用于故障恢复,不会进入 MemStore。
  • throws IOException: 如果 WAL 已经关闭,或者在处理过程中出现其他 I/O 问题,会抛出此异常。

代码逻辑详解

  protected long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore)throws IOException {if (markerEditOnly && !edits.isMetaEdit()) {throw new IOException("WAL is closing, only marker edit is allowed");}long txid =stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads);if (shouldScheduleConsumer()) {consumeExecutor.execute(consumer);}return txid;}

这个方法的逻辑非常清晰和高层,主要包含三个步骤:

前置检查

    if (markerEditOnly && !edits.isMetaEdit()) {throw new IOException("WAL is closing, only marker edit is allowed");}
...
  • markerEditOnly 是一个标志位,当 WAL 正在准备关闭或滚动(roll)时,它会被设置为 true
  • edits.isMetaEdit() 检查当前的 WALEdit 是否是一个元数据编辑(即标记性操作)。
  • 这行代码的含义是:如果 WAL 当前处于“只接受标记性编辑”的状态,但传入的却是一个普通的数据编辑,那么就直接抛出 IOException,拒绝该写入。这是一种保护机制,确保在 WAL 关闭流程中不会有新的用户数据写入。

分配序列号并发布到 RingBuffer

    long txid =stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads);
...

这是 append 方法的核心。它将所有复杂的工作委托给了 stampSequenceIdAndPublishToRingBuffer 方法。我们来分析这个子函数。

stampSequenceIdAndPublishToRingBuffer 

protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key,WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer) throws IOException {if (this.closed) {throw new IOException("Cannot append; log is closed, regionName = " + hri.getRegionNameAsString());}MutableLong txidHolder = new MutableLong();MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(() -> {txidHolder.setValue(ringBuffer.next());});long txid = txidHolder.longValue();ServerCall<?> rpcCall = RpcServer.getCurrentServerCallWithCellScanner().orElse(null);key.setWriteEntry(we);if (!edits.isReplay()) {// For replay, the sequence ids are already set.for (Cell cell : WALEditInternalHelper.getExtendedCells(edits)) {PrivateCellUtil.setSequenceId(cell, we.getWriteNumber());}}FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall);try {RingBufferTruck truck = ringBuffer.get(txid);truck.load(entry);} finally {ringBuffer.publish(txid);}return txid;}

这个方法做了几件关键事情:

  1. 获取事务 ID (txid) 和写条目 (WriteEntry):

    • 它通过 key.getMvcc().begin(...) 启动一个 MVCC 事务。MVCC 是 HBase 实现读写并发控制的核心机制。begin() 方法返回一个 WriteEntry 对象,该对象包含了一个唯一的、递增的写编号(Write Number),这个编号将作为此次操作的序列号(Sequence ID)。
    • 在 begin 方法的 lambda 表达式中,它调用 ringBuffer.next()。这是一个关键的并发操作。RingBuffer (Disruptor 模式的实现) 会预先分配好槽位(slot),next() 方法会原子性地获取下一个可用的槽位索引,这个索引就被用作本次操作的事务 ID (txid)。txidHolder 用来在 lambda 表达式内外传递这个值。
    • 总结: 这一步同时获得了 MVCC 的写编号(用于数据一致性)和 RingBuffer 的事务 ID(用于内部处理流程)。
  2. 为 Cell 打上序列号:

    • key.setWriteEntry(we): 将获取到的 WriteEntry 存入 WALKey
    • if (!edits.isReplay()): 检查这个写操作是否是来自复制(Replication)的回放。如果不是回放,就需要为 WALEdit 中的每一个 Cell 设置序列号。
    • PrivateCellUtil.setSequenceId(cell, we.getWriteNumber()): 将从 MVCC 获取到的写编号(Sequence ID)设置到每个 Cell 中。这是实现读-写隔离的关键一步。读取操作会根据自己的读点(Read Point)来决定哪些 Cell 对自己可见。
  3. 创建 FSWALEntry:

    • FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall);
    • 将 txidkeyedits 以及其他相关信息封装成一个 FSWALEntry 对象。这个对象是 WAL 内部处理的基本单元,它聚合了写入操作所需的所有信息。
  4. 发布到 RingBuffer:

    • RingBufferTruck truck = ringBuffer.get(txid);: 根据 txid 获取 RingBuffer 中对应的槽位,这个槽位由一个 RingBufferTruck 对象表示。
    • truck.load(entry);: 将 FSWALEntry "装载" 到卡车(truck)上。
    • ringBuffer.publish(txid);发布这个槽位。这一步执行后,消费者线程(Consumer)就能够看到这个新的 FSWALEntry 并开始处理它了。

触发消费者

    if (shouldScheduleConsumer()) {consumeExecutor.execute(consumer);}
...

在将任务发布到 RingBuffer 之后,需要确保有消费者线程去处理它。

  • shouldScheduleConsumer(): 这是一个检查方法,它会原子性地(使用 compareAndSet)检查当前是否已经有计划中的消费者任务。

    private boolean shouldScheduleConsumer() {int currentEpochAndState = epochAndState;if (writerBroken(currentEpochAndState) || waitingRoll(currentEpochAndState)) {return false;}return consumerScheduled.compareAndSet(false, true);
    }
    
    • 它首先检查 WAL 是否处于损坏或等待滚动的状态,如果是,则不调度新的消费任务。
    • 然后,它尝试将 consumerScheduled 标志从 false 设置为 true。如果设置成功(意味着之前没有调度任务),则返回 true。如果失败(意味着其他线程已经调度了任务),则返回 false。这可以防止不必要的重复调度。
  • consumeExecutor.execute(consumer): 如果 shouldScheduleConsumer() 返回 true,它就会向 consumeExecutor(一个线程池)提交一个 consumer 任务。这个 consumer 任务的核心是调用 consume() 方法,该方法会从 RingBuffer 中拉取 FSWALEntry,然后调用 appendAndSync() 将其写入文件系统。

这个线程池是个单线程

ThreadPoolExecutor threadPool =new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),new ThreadFactoryBuilder().setNameFormat(walType + "-%d-" + rootDir.toString() + "-prefix:"+ (prefix == null ? "default" : prefix).replace("%", "%%")).setDaemon(true).build());hasConsumerTask = () -> threadPool.getQueue().peek() == consumer;consumeExecutor = threadPool;

整体流程总结

append 方法的整个调用链体现了高性能的生产者-消费者模式(基于 Disruptor 的 RingBuffer):

  1. 生产者 (调用 append 的线程):

    • 快速地完成前置检查。
    • 通过 stampSequenceIdAndPublishToRingBuffer,以极低的锁竞争(ringBuffer.next() 是无锁的)获取事务 ID 和序列号。
    • 将数据封装成 FSWALEntry 并发布到 RingBuffer 中。
    • 检查并触发一次消费者任务。
    • 然后立即返回 txid 给调用者(例如 HRegion)。
  2. 消费者 (运行在 consumeExecutor 中的线程):

    • 在后台被唤醒。
    • 从 RingBuffer 中批量拉取 FSWALEntry
    • 调用 appendAndSync 方法,将这些 FSWALEntry 真正地写入到底层文件系统的 WAL 文件中。
    • 根据需要(例如缓冲区满了或有 sync 请求)执行同步操作。

这种设计将 I/O 密集型的写文件和同步操作与客户端请求处理线程解耦,使得 append 调用可以非常快地返回,从而极大地提高了 HBase 的写入吞吐量。

总结

AbstractFSWAL 是一个设计精良、功能强大的抽象类。它通过模板方法模式定义了 WAL 的核心工作流,通过生产者-消费者模型和 Disruptor RingBuffer 实现了高性能的异步写入,通过锁和原子变量保证了并发环境下的线程安全和状态一致性,并通过精密的 SequenceId 核算机制实现了可靠的日志生命周期管理。它是理解 HBase 数据写入可靠性和高性能特性的关键一环。

AbstractFSWAL.consume() 方法

这个方法是 AbstractFSWAL 中消费者线程的核心执行逻辑。在整个 WAL 的生产者-消费者模型中,append 和 doSync 方法是生产者,它们将 FSWALEntry (写请求) 和 SyncFuture (同步请求) 放入 RingBuffer。而 consume() 方法就是消费者,它被一个单独的线程池 (consumeExecutor) 执行,负责从 RingBuffer 中取出这些请求,并将它们真正地写入文件系统。

这个方法的设计目标是高效、批量地处理数据,并正确地管理 WAL 的状态(如滚动日志、处理异常等)。

private void consume() {// 1. 状态检查与锁保护consumeLock.lock();try {// ... 检查 writer 是否损坏或正在等待滚动 ...} finally {consumeLock.unlock();}// 2. 从 RingBuffer 中批量拉取数据// ... 循环从 waitingConsumePayloads 中获取 RingBufferTruck ...// 3. 处理 RingBuffer 关闭时的特殊情况if (markerEditOnly) {drainNonMarkerEditsAndFailSyncs();}// 4. 将拉取的数据写入文件系统并触发同步try {appendAndSync();} catch (IOException exception) {// ... 处理写入异常 ...return;}// 5. 消费者任务的自我调度与退出逻辑// ... 判断是否需要重新调度自己,或者在空闲时退出 ...
}

下面我们逐个部分进行详细分析。

状态检查与锁保护

private void consume() {consumeLock.lock();try {int currentEpochAndState = epochAndState;if (writerBroken(currentEpochAndState)) {return;}if (waitingRoll(currentEpochAndState)) {if (writer.getLength() > fileLengthAtLastSync) {// issue a syncsync(writer);} else {if (unackedAppends.isEmpty()) {readyForRolling = true;readyForRollingCond.signalAll();}}return;}} finally {consumeLock.unlock();}// ...
}
  • consumeLock.lock(): 方法开始时获取 consumeLock。这个锁主要用于保护 WAL 的状态转换,特别是与日志滚动(rolling)相关的状态变量,如 epochAndStatereadyForRolling 等。它确保了在检查和处理这些状态时不会与请求日志滚动的线程(如 rollWriter)发生冲突。
  • if (writerBroken(...)): 检查 WAL writer 是否已损坏。如果已损坏,消费者线程就没必要继续工作了,直接返回。
  • if (waitingRoll(...)): 检查是否正处于“等待滚动”状态。这是日志滚动的准备阶段。
    • 如果当前还有未同步到磁盘的数据(writer.getLength() > fileLengthAtLastSync),则调用 sync(writer) 触发一次最终的同步,确保旧日志文件的数据完整性。
    • 如果所有数据都已同步(unackedAppends.isEmpty()),则将 readyForRolling 标志设置为 true,并通过 readyForRollingCond.signalAll() 唤醒正在等待的 rollWriter 线程,告诉它可以安全地进行日志切换了。
    • 处理完滚动逻辑后,直接 return,本次消费任务结束。

从 RingBuffer 中批量拉取数据

// ...long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1;for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor<= cursorBound; nextCursor++) {if (!waitingConsumePayloads.isPublished(nextCursor)) {break;}RingBufferTruck truck = waitingConsumePayloads.get(nextCursor);switch (truck.type()) {case APPEND:toWriteAppends.addLast(truck.unloadAppend());break;case SYNC:syncFutures.add(truck.unloadSync());break;default:LOG.warn("RingBufferTruck with unexpected type: " + truck.type());break;}waitingConsumePayloadsGatingSequence.set(nextCursor);}
// ...

这是消费者从 RingBuffer "卸货" 的过程。

  • waitingConsumePayloadsGatingSequence: 这是消费者自己的进度计数器 (Sequence)。它记录了消费者已经处理到的序列号。
  • waitingConsumePayloads.getCursor(): 这是生产者的进度,即已经发布到 RingBuffer 的最新序列号。
  • for 循环: 这个循环会尝试处理从 (自己的进度 + 1) 到 生产者最新进度 之间的所有事件。
  • isPublished(nextCursor): 这是一个必要的检查,因为在多生产者模式下,序列号的发布可能不是连续的。这个检查确保我们只处理真正被发布了的事件。
  • switch (truck.type())RingBufferTruck 是一个可以装载不同类型货物的“卡车”。
    • APPEND: 如果是写请求,就调用 truck.unloadAppend() 取出 FSWALEntry,并将其放入 toWriteAppends 这个 Deque (双端队列) 中。这是一个本地的待写缓冲区。
    • SYNC: 如果是同步请求,就调用 truck.unloadSync() 取出 SyncFuture,并将其放入 syncFutures 这个 SortedSet 中。
  • waitingConsumePayloadsGatingSequence.set(nextCursor): 每处理完一个事件,就更新自己的进度。

这个批量拉取的设计非常高效,它一次性地将 RingBuffer 中的可用事件全部转移到本地队列,减少了与 RingBuffer 的交互次数。

将数据写入文件系统并触发同步 (appendAndSync)

// ...try {appendAndSync();} catch (IOException exception) {/*** For {@link FSHog},here may catch IOException,but for {@link AsyncFSWAL}, the code doesn't* go in here.*/LOG.error("appendAndSync throws IOException.", exception);onAppendEntryFailed(exception);return;}
// ...

在将数据从 RingBuffer 拉取到 toWriteAppends 后,appendAndSync() 方法负责将这些数据真正写入文件。

appendAndSync() 

这个方法的核心逻辑是:

  1. 遍历 toWriteAppends 队列:

    for (Iterator<FSWALEntry> iter = toWriteAppends.iterator(); iter.hasNext();) {FSWALEntry entry = iter.next();boolean appended = appendEntry(writer, entry); // 核心写入调用// ...if (appended) {unackedAppends.addLast(entry); // 加入待确认队列}// ...
    }
    
    • 它会循环处理 toWriteAppends 中的每个 FSWALEntry
    • appendEntry(writer, entry): 这是一个抽象方法或具体实现,负责将 entry 的内容序列化并写入底层的 writer(即 WAL 文件流)。
    • unackedAppends: 如果写入成功,entry 会被从 toWriteAppends 移除,并加入到 unackedAppends 队列。这个队列里的条目代表“已写入但未同步(fsync)到磁盘”。
  2. 判断是否触发同步 (sync):

    if (writer.getLength() - fileLengthAtLastSync >= batchSize) {// sync because buffer size limit.sync(writer);return;
    }
    
    • 在写入一批数据后,它会检查自上次同步以来写入的数据量是否超过了 batchSize 阈值。
    • 如果超过了,就调用 sync(writer) 方法来执行一次文件系统的同步操作,将内核缓冲区的数据刷到磁盘。

sync(writer)

private void sync(W writer) {// ...final long epoch = (long) epochAndState >>> 2L;addListener(doWriterSync(writer, shouldUseHsync, currentHighestProcessedAppendTxid),(result, error) -> {if (error != null) {syncFailed(epoch, error);} else {long syncedTxid = getSyncedTxid(currentHighestProcessedAppendTxid, result);syncCompleted(epoch, writer, syncedTxid, startTimeNs);}}, consumeExecutor);
}

sync 方法本身是非阻塞的。它将真正的同步操作封装成一个异步任务。

  • doWriterSync(...): 这是一个抽象方法,由子类(FSHLog 或 AsyncFSWAL)实现,负责执行真正的 hsync 或 hflush 操作。它返回一个 CompletableFuture<Long>
  • addListener(...): 它为 CompletableFuture 添加一个回调。当异步的 sync 操作完成时,这个回调会被执行。
    • 如果 sync 失败 (error != null),则调用 syncFailed() 处理异常。
    • 如果 sync 成功,则调用 syncCompleted() 来更新状态,清理 unackedAppends 队列,并完成相关的 SyncFuture

消费者任务的自我调度与退出逻辑

// ...if (hasConsumerTask.get()) {return;}if (toWriteAppends.isEmpty()) {if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) {consumerScheduled.set(false);// ... recheck ...if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) {// ... 检查是否有残留的 sync 请求,有则触发 sync ...return; // 退出消费循环} else {// ... 重新尝试将自己标记为 scheduled ...}}}// reschedule if we still have something to write.consumeExecutor.execute(consumer);
}

这是 consume 方法的尾部,也是最复杂的部分之一,它决定了消费者线程的生命周期。

  • 目标: 避免不必要的线程空转,同时确保不会漏掉任何新来的数据。
  • 核心逻辑:
    1. 如果本地还有待写入的数据 (!toWriteAppends.isEmpty()),说明工作还没做完,直接调用 consumeExecutor.execute(consumer) 重新调度自己,立即开始下一轮消费。
    2. 如果本地数据写完了 (toWriteAppends.isEmpty()),并且 RingBuffer 也空了 (gatingSequence == cursor),这表明消费者可能要进入空闲状态了。
    3. 此时,它会尝试将 consumerScheduled 标志位设置为 false。这是一个原子操作。
    4. 双重检查 (Double-Check): 在设置 consumerScheduled 为 false 之后,它会再次检查 gatingSequence == cursor。这是为了处理一个经典的竞态条件:
      • T1 (消费者): 检查发现 RingBuffer 为空。
      • T2 (生产者): 此时向 RingBuffer 发布了一个新数据,并尝试调度消费者,但发现 consumerScheduled 是 true,于是放弃调度。
      • T1 (消费者): 将 consumerScheduled 设置为 false 并退出。
      • 结果:数据在 RingBuffer 中,但没有消费者去处理它,系统卡死。
    5. 通过双重检查,如果在设置 consumerScheduled 为 false 后发现又有新数据了,消费者会再次尝试将 consumerScheduled 设置回 true 并重新调度自己,从而避免了上述问题。
    6. 如果双重检查也通过了,说明系统确实空闲了,消费者线程就可以安全地 return,结束本次执行,等待下一次被生产者唤醒。

总结

consume() 方法是 HBase WAL 高性能异步写入模型的心脏。它通过一个独立的消费者线程,实现了以下功能:

  • 批量处理: 从 RingBuffer 批量拉取事件,减少并发开销。
  • 状态管理: 安全地处理日志滚动和 writer 异常等状态转换。
  • 异步I/O: 将耗时的 sync 操作异步化,不阻塞消费主流程。
  • 高效调度: 通过精巧的无锁和双重检查机制,实现了消费者线程的自我调度和休眠,在保证数据不丢失的前提下,最大限度地减少了 CPU 的空转。

整个流程清晰地将“接收数据”、“写入数据”和“持久化数据”三个阶段解耦,并通过批量和异步处理,实现了极高的写入吞吐量。

FSHLog

FSHLog 是 HBase 预写日志(WAL)最经典、也是在 2.x 版本之前默认的实现。它完整地实现了 AbstractFSWAL 定义的框架,提供了一个基于 HDFS 的、可靠的、同步的 WAL 解决方案。理解 FSHLog 对于理解 HBase 的数据写入和持久化机制至关重要。

@InterfaceAudience.Private
public class FSHLog extends AbstractFSWAL<Writer> {
// ...
  • public class FSHLog: 这是一个具体的实现类,可以直接被 WALFactory 实例化和使用。
  • extends AbstractFSWAL<Writer>: 这表明 FSHLog 继承了 AbstractFSWAL 的所有通用逻辑,包括基于 RingBuffer 的生产者-消费者模型、日志滚动、清理等。
  • <Writer>: 这里的泛型参数 Writer 是一个关键点。它不是 java.io.Writer,而是 org.apache.hadoop.hbase.wal.WALProvider.Writer 的一个子接口,具体定义在 FSHLogProvider 中。这个 Writer 接口定义了向 WAL 文件中追加(append)和同步(sync)条目的具体行为。FSHLog 使用的 Writer 实现通常是 ProtobufLogWriter,它负责将 WALEdit 序列化为 Protobuf 格式并写入 HDFS。

核心职责与设计思想

FSHLog 的核心职责是作为 AbstractFSWAL 框架的一个具体化实例。它的设计思想可以概括为:

  1. 同步持久化FSHLog 的 sync 操作是阻塞和同步的。当一个线程调用 sync 时,它必须等待数据真正被刷写到 HDFS 的 DataNode 后才能返回。这是其可靠性的基石。
  2. 并发优化: 尽管单个 sync 是阻塞的,但 FSHLog 引入了一个重要的优化来处理高并发的 sync 请求:SyncRunner 线程池。它避免了让 AbstractFSWAL 的单个消费者线程成为 sync 操作的瓶颈。
  3. 可靠性增强: 增加了对 HDFS 副本数不足的检查机制 (checkLowReplication),当发现写入的 WAL 文件副本数过低时,会主动触发日志滚动,以尽快将数据写入一个新的、健康的 HDFS 文件中。

FSHLog 在继承 AbstractFSWAL 的基础上,增加了自己特有的字段和逻辑。

SyncRunner 线程池:并发 sync 的核心

这是 FSHLog 相对于其父类 AbstractFSWAL 最重要的一个增强。AbstractFSWAL 的模型中只有一个消费者线程,如果这个线程在执行耗时很长的 sync 操作时,会阻塞后续所有 append 和 sync 的处理。FSHLog 通过引入一个专门的 SyncRunner 线程池来解决这个问题。

// ... existing code ...private final int syncerCount;private int syncRunnerIndex = 0;private SyncRunner[] syncRunners = null;
// ... existing code ...@Overrideprotected CompletableFuture<Long> doWriterSync(Writer writer, boolean shouldUseHSync,long txidWhenSync) {CompletableFuture<Long> future = new CompletableFuture<>();SyncRequest syncRequest = new SyncRequest(writer, shouldUseHSync, txidWhenSync, future);this.offerSyncRequest(syncRequest);return future;}private void offerSyncRequest(SyncRequest syncRequest) {for (int i = 0; i < this.syncRunners.length; i++) {this.syncRunnerIndex = (this.syncRunnerIndex + 1) % this.syncRunners.length;if (this.syncRunners[this.syncRunnerIndex].offer(syncRequest)) {return;}}// ...}
// ... existing code ...
  • 工作流程:

    1. AbstractFSWAL 的消费者线程不再自己执行 sync。当需要 sync 时,它会调用 doWriterSync 方法。
    2. doWriterSync 将 sync 操作封装成一个 SyncRequest 对象。
    3. offerSyncRequest 方法会以 轮询(Round-Robin) 的方式,将这个 SyncRequest 提交给 syncRunners 线程池中的一个 SyncRunner 线程。
    4. SyncRunner 是一个独立的线程,它内部有一个阻塞队列 syncRequests。它从队列中取出请求,执行耗时的 HDFS sync 调用。
    5. sync 完成后,SyncRunner 线程会完成(complete)SyncRequest 中携带的 CompletableFuture,从而唤醒最初等待 sync 结果的线程。
  • 优势: 这种设计将 AbstractFSWAL 的主消费者线程从耗时的 sync 操作中解放出来,使其可以继续处理后续的 append 请求,而将 sync 的压力分散到多个 SyncRunner 线程上,极大地提高了 WAL 在高并发写入场景下的吞吐和延迟表现。

低副本数检测机制

为了应对 HDFS DataNode 故障导致文件副本数不足的情况,FSHLog 引入了一套主动检测和恢复机制。

// ... existing code ...private final int minTolerableReplication;private final AtomicInteger consecutiveLogRolls = new AtomicInteger(0);private final int lowReplicationRollLimit;private volatile boolean lowReplicationRollEnabled = true;
// ... existing code ...
  • minTolerableReplication: 可容忍的最低副本数,可配置。
  • checkLowReplication() (在 postSync 中被调用): 每次 sync 之后,会检查当前 HDFS 输出流的实际副本数。
  • 触发滚动: 如果实际副本数低于 minTolerableReplication,就会触发一次日志滚动(requestLogRoll(LOW_REPLICATION))。这样做的目的是放弃当前不健康的 HDFS 文件,立刻切换到一个新的、副本数正常的 HDFS 文件上,以保证后续写入的可靠性。
  • 保护机制: 为了防止因 HDFS 集群持续异常而导致无限次的日志滚动,lowReplicationRollLimit 和 consecutiveLogRolls 字段提供了一个熔断机制。如果连续因为低副本数触发滚动的次数超过限制,就会暂时禁用这个功能 (lowReplicationRollEnabled = false),直到副本数恢复正常。

实现 AbstractFSWAL 的抽象方法

FSHLog 为父类的抽象方法提供了具体的实现,这是它作为具体 WAL 实现的核心。

  • createWriterInstance(FileSystem fs, Path path):

    // ... existing code ...@Overrideprotected Writer createWriterInstance(FileSystem fs, Path path) throws IOException {Writer writer = FSHLogProvider.createWriter(conf, fs, path, false, this.blocksize);if (writer instanceof ProtobufLogWriter) {preemptiveSync((ProtobufLogWriter) writer);}return writer;}
    

    这个方法通过 FSHLogProvider.createWriter 创建一个 Writer 实例,通常是 ProtobufLogWriter。它还执行了一个 preemptiveSync(预同步)操作,这是一个优化,目的是在真正写入数据前就建立好 HDFS 的写入管道,减少第一次真实写入时的延迟。

  • doAppend(Writer writer, FSWALEntry entry):

    // ... existing code ...@Overrideprotected void doAppend(Writer writer, FSWALEntry entry) throws IOException {writer.append(entry);}
    

    实现非常直接,就是调用 Writer 对象的 append 方法,将序列化后的 WALEdit 写入 HDFS 的输出流缓冲区。

  • onWriterReplaced(Writer nextWriter):

    // ... existing code ...@Overrideprotected void onWriterReplaced(Writer nextWriter) {if (nextWriter != null && nextWriter instanceof ProtobufLogWriter) {this.hdfs_out = ((ProtobufLogWriter) nextWriter).getStream();} else {this.hdfs_out = null;}this.createSyncRunnersAndStart();}
    

    当日志滚动、Writer 被替换时,这个回调方法会被触发。它会更新 hdfs_out 字段以指向新的输出流,并且重新创建并启动 SyncRunner 线程池。这是因为旧的 SyncRunner 线程持有的是对旧 Writer 的引用,必须为新的 Writer 创建新的 SyncRunner

总结

FSHLog 是 AbstractFSWAL 框架的一个经典、稳健且功能完备的实现。它不仅忠实地执行了父类定义的日志管理流程,更通过引入并发的 SyncRunner 线程池主动的低副本检测机制,在性能和可靠性上做了关键的增强。虽然在 HBase 3.x 之后,默认的 WAL 实现被 AsyncFSWAL 取代以追求更低的延迟,但 FSHLog 的设计和实现仍然是理解 HBase WAL 机制的绝佳范本。

多个SyncRunner线程调用同一个writer实例的sync()会有并发问题吗

答案是不会,但这套设计能够成功运行,其核心原因在于**sync()操作本身的特性和底层HDFS客户端的线程安全保证**。

让我们来详细拆解一下:

append (追加写入) vs sync (刷盘)

首先,我们要严格区分两种操作:

  • append (追加数据):这个操作是向内存缓冲区中添加新的数据。这个过程是严格串行的。正如我上次提到的,只有一个消费者线程负责调用 writer.append()。这保证了所有的数据编辑(WALEdit)是按顺序进入缓冲区的,绝对不会出现多线程同时写缓冲区导致数据交错或损坏的情况。

  • sync (刷盘):这个操作是将已经存在于缓冲区中的数据持久化到磁盘。多个SyncRunner线程执行的就是这个操作。它们并不添加新数据,只是触发一个“刷盘”的命令。

 writer.sync() 的线程安全性

多个线程同时在同一个文件输出流上调用sync(),难道不会冲突吗?

之所以不会冲突,是因为FSHLogwriter底层依赖的Hadoop FSDataOutputStreamhsync()hflush()方法本身就是线程安全的

  • HDFS客户端库在设计时已经考虑到了并发调用的场景。当多个线程在同一个文件流(FSDataOutputStream)上调用hsync()时,HDFS客户端内部的锁机制会确保这些调用被正确地同步。这意味着,即使有多个SyncRunner同时请求sync,在最底层执行刷盘操作的核心逻辑也是互斥的,不会导致文件损坏或数据不一致。
  • 可以把FSDataOutputStream看作一个线程安全的对象,它自己管理着对底层数据流的并发访问。

比如(某条条用路径调用)DFSOutPutStream 的 flushOrSync方法有做同步

    public void hsync(EnumSet<HdfsDataOutputStream.SyncFlag> syncFlags) throws IOException {try (TraceScope ignored = this.dfsClient.newPathTraceScope("hsync", this.src)) {this.flushOrSync(true, syncFlags);}}
private void flushOrSync(boolean isSync, EnumSet<HdfsDataOutputStream.SyncFlag> syncFlags) throws IOException {this.dfsClient.checkOpen();this.checkClosed();try {long lastBlockLength = -1L;boolean updateLength = syncFlags.contains(SyncFlag.UPDATE_LENGTH);boolean endBlock = syncFlags.contains(SyncFlag.END_BLOCK);long toWaitFor;synchronized(this) {int numKept = this.flushBuffer(!endBlock, true);DFSClient.LOG.debug("DFSClient flush():  bytesCurBlock={}, lastFlushOffset={}, createNewBlock={}", new Object[]{this.getStreamer().getBytesCurBlock(), this.lastFlushOffset, endBlock});if (this.lastFlushOffset != this.getStreamer().getBytesCurBlock()) {assert this.getStreamer().getBytesCurBlock() > this.lastFlushOffset;this.lastFlushOffset = this.getStreamer().getBytesCurBlock();if (isSync && this.currentPacket == null && !endBlock) {this.currentPacket = this.createPacket(this.packetSize, this.chunksPerPacket, this.getStreamer().getBytesCurBlock(), this.getStreamer().getAndIncCurrentSeqno(), false);}} else if (isSync && this.getStreamer().getBytesCurBlock() > 0L && !endBlock) {this.currentPacket = this.createPacket(this.packetSize, this.chunksPerPacket, this.getStreamer().getBytesCurBlock(), this.getStreamer().getAndIncCurrentSeqno(), false);} else if (this.currentPacket != null) {this.currentPacket.releaseBuffer(this.byteArrayManager);this.currentPacket = null;}if (this.currentPacket != null) {this.currentPacket.setSyncBlock(isSync);this.enqueueCurrentPacket();}if (endBlock && this.getStreamer().getBytesCurBlock() > 0L) {this.currentPacket = this.createPacket(0, 0, this.getStreamer().getBytesCurBlock(), this.getStreamer().getAndIncCurrentSeqno(), true);this.currentPacket.setSyncBlock(this.shouldSyncBlock || isSync);this.enqueueCurrentPacket();this.getStreamer().setBytesCurBlock(0L);this.lastFlushOffset = 0L;} else {this.getStreamer().setBytesCurBlock(this.getStreamer().getBytesCurBlock() - (long)numKept);}toWaitFor = this.getStreamer().getLastQueuedSeqno();}this.getStreamer().waitForAckedSeqno(toWaitFor);if (updateLength || this.getStreamer().getPersistBlocks().get()) {synchronized(this) {if (!this.getStreamer().streamerClosed() && this.getStreamer().getBlock() != null) {lastBlockLength = this.getStreamer().getBlock().getNumBytes();}}}if (this.getStreamer().getPersistBlocks().getAndSet(false) || updateLength) {try {this.dfsClient.namenode.fsync(this.src, this.fileId, this.dfsClient.clientName, lastBlockLength);} catch (IOException ioe) {DFSClient.LOG.warn("Unable to persist blocks in hflush for " + this.src, ioe);this.checkClosed();throw ioe;}}synchronized(this) {if (!this.getStreamer().streamerClosed()) {this.getStreamer().setHflush();}}} catch (InterruptedIOException interrupt) {throw interrupt;} catch (IOException var21) {IOException e = var21;DFSClient.LOG.warn("Error while syncing", var21);synchronized(this) {if (!this.isClosed()) {this.getStreamer().getLastException().set(e);this.closeThreads(true);}}throw var21;}}

底层有同步 为什么需要多个 SyncRunner

既然底层的hsync()是同步互斥的,那为什么还要设计多个SyncRunner线程呢?这不是多此一举吗?

这正是FSHLog设计的精妙之处,其目的在于降低请求的平均延迟

  • sync是一个非常耗时的I/O操作。如果只有一个SyncRunner,那么所有的sync请求都必须在一个队列里排队等待。一个RPC Handler线程发起了sync,就必须等到在它前面的所有sync都完成了,才能轮到它,等待时间会非常长。

  • 通过使用一个SyncRunner线程池,一个sync请求可以被提交给任何一个空闲的SyncRunner。虽然这些SyncRunner最终到底层HDFS时可能还是会排队,但FSHLog在应用层面实现了并发处理。这带来了几个好处:

    1. 快速响应:RPC Handler线程提交sync请求后可以更快地得到一个CompletableFuture并返回(或异步等待),而不用在FSHLog的单一入口点上排队。
    2. 批量处理SyncRunner的设计可以对请求进行批量完成。当一个sync操作完成后,它可以一次性地通知所有在此sync点之前提交的请求都已经完成,如completeSyncRequests方法的逻辑所示。
    3. 避免队头阻塞:避免了在应用层面因为一个慢的sync操作而阻塞后面所有的请求。

SyncRunnerrun方法,每个线程都在循环地获取请求并执行sync

// ... existing code ...private class SyncRunner extends Thread {
// ... existing code ...@Overridepublic void run() {while (!this.shutDown) {try {SyncRequest syncRequest = this.takeSyncRequest();// ...// All SyncRunner threads call sync() on the same 'writer' instance.// This is safe because the underlying FSDataOutputStream.hsync() is thread-safe.writer.sync(syncRequest.shouldUseHSync);// ...
// ... existing code ...

总结一下

“多线程写入WAL”这个描述,如果指的是append数据,那是不准确的,因为append是单线程的。如果指的是sync刷盘,那么描述是准确的,确实是多线程在sync。但这之所以安全,是因为**append的串行化保证了数据进入缓冲区的顺序,而sync的线程安全由底层的HDFS客户端保证**。整个设计是用多线程sync来优化延迟,而不是并行写入数据。

FSHLog 内部类SyncRunner

SyncRunner 是 FSHLog 实现高性能、高并发 WAL(预写日志)同步的关键组件。理解它的工作原理,对于理解 HBase 如何处理写请求的持久化至关重要。

为什么需要 SyncRunner

在 AbstractFSWAL 的设计中,consume() 线程负责将数据从 RingBuffer 中取出并写入(append)到文件系统的缓冲区。但是,仅仅 append 是不够的,数据可能还停留在操作系统的内存里,并没有真正落盘。为了保证数据的持久性,必须调用 hdfs_out.sync() 或 hdfs_out.hsync(),这个操作会强制将数据刷到物理磁盘上。

核心问题sync() 是一个非常耗时的 I/O 操作,它会阻塞当前线程直到磁盘完成写入。如果让 consume() 线程自己去执行 sync(),那么在 sync() 期间,consume() 线程就无法继续从 RingBuffer 中拉取和处理新的写请求,整个 WAL 的处理流水线就会被阻塞,导致写性能急剧下降。

解决方案FSHLog 引入了 SyncRunner 线程池。consume() 线程不再自己执行耗时的 sync(),而是将“同步请求”打包成一个 SyncRequest 对象,然后把它扔给一个专门的 SyncRunner 线程去处理。这样,consume() 线程就可以立即返回,继续处理下一批数据,实现了同步操作的异步化

正如代码注释中所说:

This call takes a while to complete. This is the longest pole adding edits to the WAL and this must complete to be sure all edits persisted. We run multiple threads sync'ing rather than one that just syncs in series so we have better latencies...

通过运行一个 SyncRunner 线程池(数量由 hbase.regionserver.hlog.syncer.count 配置,默认为5),可以并行处理多个 sync 请求,进一步提高了吞吐量和降低了延迟。

SyncRunner 的结构和关键成员

private class SyncRunner extends Thread {// Keep around last exception thrown. Clear on successful sync.private final BlockingQueue<SyncRequest> syncRequests;private volatile boolean shutDown = false;SyncRunner(final String name, final int maxHandlersCount) {super(name);// ...this.syncRequests = new LinkedBlockingQueue<>(maxHandlersCount * 3);}// ...
}
  • extends ThreadSyncRunner 本身就是一个线程。
  • syncRequests: 这是一个 LinkedBlockingQueue<SyncRequest>,是 SyncRunner 的核心。它是一个阻塞队列,用于接收来自 consume() 线程的同步请求。
    • consume() 线程是生产者,通过 syncRunners[i].offer(syncRequest) 向队列中添加请求。
    • SyncRunner 线程是消费者,通过 syncRequests.take() 从队列中取出请求来处理。
  • shutDown: 一个 volatile 的布尔标志,用于优雅地停止 SyncRunner 线程。

SyncRunner 的工作流程(run() 方法)

run() 方法是 SyncRunner 线程的主循环,我们分步解析其逻辑。

@Override
public void run() {while (!this.shutDown) {try {// 1. 获取同步请求SyncRequest syncRequest = this.takeSyncRequest();// 2. 状态检查// ... (检查 writer 是否被替换或损坏) ...// 3. 确定本次 sync 的序列号上界long currentSequenceToUse = syncRequest.sequenceWhenSync;if (currentHighestProcessedAppendTxid > currentSequenceToUse) {currentSequenceToUse = currentHighestProcessedAppendTxid;}// 4. 执行同步并处理结果Exception lastException = null;try {writer.sync(syncRequest.shouldUseHSync);} catch (IOException e) {// ...lastException = e;} finally {if (lastException != null) {this.completeExceptionallySyncRequests(syncRequest, lastException);} else {this.completeSyncRequests(syncRequest, currentSequenceToUse);}}} catch (InterruptedException e) {// ...} catch (Throwable t) {// ...}}// 5. 关闭时清理this.clearSyncRequestsWhenShutDown();
}
步骤 1: 获取同步请求 (takeSyncRequest())
private SyncRequest takeSyncRequest() throws InterruptedException {while (true) {SyncRequest syncRequest = this.syncRequests.take();long currentHighestSyncedSequence = highestSyncedTxid.get();if (syncRequest.sequenceWhenSync < currentHighestSyncedSequence) {syncRequest.completableFuture.complete(currentHighestSyncedSequence);continue;}return syncRequest;}
}
  • syncRequests.take(): 这是个阻塞操作。如果队列为空,SyncRunner 线程会在这里睡眠,直到有新的 SyncRequest 到来。
  • 优化点: 在真正执行耗时的 sync 之前,它会检查这个请求的序列号 (sequenceWhenSync) 是否已经小于等于全局已经同步完成的最高序列号 (highestSyncedTxid)。如果是,说明这个请求所期望的持久化保证已经被后续的某个 sync 操作顺便完成了。这时就没必要再做一次 sync,直接 complete 这个请求的 Future,然后 continue 去取下一个请求。这是一个非常重要的性能优化,避免了冗余的 sync 操作。
步骤 2: 状态检查
// ...
Writer currentWriter = writer;
if (currentWriter != syncRequest.writer) {syncRequest.completableFuture.completeExceptionally(WITER_REPLACED_EXCEPTION);continue;
}
if (writerBroken) {syncRequest.completableFuture.completeExceptionally(WITER_BROKEN_EXCEPTION);continue;
}
// ...

在执行 sync 前,会进行一系列安全检查:

  • Writer 是否被替换SyncRequest 中保存了它被创建时所对应的 writer。如果当前的 writer 已经不是那个了(通常是因为发生了日志滚动),那么对旧的 writer 进行 sync 就没有意义了。此时会直接让请求失败。
  • Writer 是否损坏: 如果 writer 已被标记为损坏,同样直接让请求失败。
步骤 3: 确定序列号上界
// ...
long currentSequenceToUse = syncRequest.sequenceWhenSync;
// ...
long currentHighestProcessedAppendTxid = highestProcessedAppendTxid;
// ...
if (currentHighestProcessedAppendTxid > currentSequenceToUse) {currentSequenceToUse = currentHighestProcessedAppendTxid;
}
// ...
  • syncRequest.sequenceWhenSync 是 consume() 线程发起这个 sync 请求时,它期望能同步到的序列号。
  • highestProcessedAppendTxid 是 consume() 线程已经 append 到文件缓冲区的最新序列号。
  • 在 SyncRunner 准备执行 sync 时,consume() 线程可能已经又 append 了更多的数据。所以,这次 sync 实际能保证的持久化位置,应该是这两者中的较大值。currentSequenceToUse 就代表了本次 sync 成功后,可以安全确认的最高序列号。
步骤 4: 执行同步并完成 Future
// ...
try {writer.sync(syncRequest.shouldUseHSync);
} catch (IOException e) {lastException = e;
} finally {if (lastException != null) {this.completeExceptionallySyncRequests(syncRequest, lastException);} else {this.completeSyncRequests(syncRequest, currentSequenceToUse);}
}
// ...
  • writer.sync(...): 这是真正执行文件系统同步的地方,也是最耗时的操作。
  • finally 块: 无论成功还是失败,都必须处理结果。
    • 失败: 调用 completeExceptionallySyncRequests(),它会用异常完成当前 syncRequest 的 Future,并且会批量地让所有后续排队的、属于同一个 writer 的请求也都快速失败。
    • 成功: 调用 completeSyncRequests()
completeSyncRequests() 的批量完成机制
private void completeSyncRequests(SyncRequest syncRequest, long syncedSequenceId) {if (syncRequest != null) {syncRequest.completableFuture.complete(syncedSequenceId);}while (true) {SyncRequest head = this.syncRequests.peek();if (head == null || head.sequenceWhenSync > syncedSequenceId) {break;}head.completableFuture.complete(syncedSequenceId);this.syncRequests.poll();}
}

这是另一个关键的**批量处理(Batching)**优化。一次成功的 sync 会将截止到 syncedSequenceId 的所有数据都持久化。因此,不仅要完成当前这个 syncRequest 的 Future,还应该检查队列中排在前面的其他 SyncRequest。如果它们的期望序列号也小于等于 syncedSequenceId,那么它们也一并被满足了。这个 while 循环就是遍历并完成所有这些“搭便车”的请求,极大地提高了效率。

步骤 5: 关闭与清理
void shutDown() {try {this.shutDown = true;this.interrupt();this.join();} catch (InterruptedException e) {//...}
}private void clearSyncRequestsWhenShutDown() {while (true) {SyncRequest syncRequest = this.syncRequests.poll();if (syncRequest == null) {break;}syncRequest.completableFuture.completeExceptionally(WAL_CLOSE_EXCEPTION);}
}
  • shutDown() 方法通过设置 shutDown 标志并中断线程来发起关闭。中断是为了唤醒可能在 take() 操作上阻塞的线程。
  • run() 方法退出循环后,会调用 clearSyncRequestsWhenShutDown(),将队列中所有剩余的、未处理的请求全部以 WAL_CLOSE_EXCEPTION 异常来完成,确保不会有任何请求被无限期地挂起。

总结

FSHLog.SyncRunner 是一个精心设计的多线程消费者模型,它专门用于处理 WAL 中最耗时的 sync 操作。其核心设计思想包括:

  1. 异步化: 将 sync 操作从主消费逻辑中剥离,避免阻塞关键路径。
  2. 并发化: 使用多个 SyncRunner 线程并行执行 sync,提高整体吞吐量。
  3. 批量化: 一次 sync 操作会批量完成多个客户端的 sync 请求,摊薄了单次 I/O 的成本。
  4. 健壮性: 包含了详细的异常处理、状态检查和优雅关闭的逻辑,保证了系统的稳定可靠。

通过 SyncRunner,HBase 将 WAL 的写入过程分解成了一个高效的流水线:RPC Handler -> RingBuffer -> consume() append -> SyncRunner sync(),每一级都通过异步和批量处理来最大化性能。

FSHLog.doWriterSync 

这个方法是 FSHLog 实现其核心的异步 sync 机制的起点,是连接 AbstractFSWAL 的通用消费逻辑和 FSHLog 具体的多线程 SyncRunner 实现之间的桥梁。

首先,我们要理解这个方法在整个 WAL 写入流程中的位置:

  1. 生产者(RPC Handler): 调用 wal.sync() 请求数据持久化。
  2. AbstractFSWALsync() 方法将一个 SyncFuture 对象放入 RingBuffer。
  3. 消费者(consume() 线程): 从 RingBuffer 中取出 SyncFuture,并判断是否需要触发一次文件系统的 sync 操作(例如,数据量达到阈值)。
  4. 触发同步: 如果需要 syncconsume() 线程会调用 sync(writer) 方法。
  5. sync(writer): 这个方法内部会调用 doWriterSync(...)

所以,doWriterSync 的直接调用者是 AbstractFSWAL 中的 sync(writer) 方法,它代表着“决定执行一次物理同步”的时刻。

 doWriterSync 方法源码分析

我们来看一下 FSHLog.java 中这个方法的具体实现:

@Override
protected CompletableFuture<Long> doWriterSync(Writer writer, boolean shouldUseHSync,long txidWhenSync) {CompletableFuture<Long> future = new CompletableFuture<>();SyncRequest syncRequest = new SyncRequest(writer, shouldUseHSync, txidWhenSync, future);this.offerSyncRequest(syncRequest);return future;
}

这个方法的逻辑非常清晰和简洁,可以分解为三步:

步骤 1: 创建一个 CompletableFuture
CompletableFuture<Long> future = new CompletableFuture<>();
  • CompletableFuture 是 Java 8 引入的强大的异步编程工具。它代表一个未来某个时刻才会完成的操作的结果。
  • 在这里,它扮演着一个“凭证”的角色。doWriterSync 的调用者(即 sync(writer) 方法)会得到这个 future。调用者可以给这个 future 添加回调函数(通过 addListener),以便在 sync 操作最终完成时得到通知(无论是成功还是失败)。
  • 这个 future 的泛型是 Long,代表 sync 成功后,返回的已同步的最高事务 ID(txid)。
步骤 2: 封装一个 SyncRequest
SyncRequest syncRequest = new SyncRequest(writer, shouldUseHSync, txidWhenSync, future);

doWriterSync 并不自己执行耗时的 sync 操作。相反,它将这次同步所需的所有信息打包成一个 SyncRequest 对象。我们来看一下 SyncRequest 的定义:

static class SyncRequest {private final Writer writer;private final boolean shouldUseHSync;private final long sequenceWhenSync;private final CompletableFuture<Long> completableFuture;public SyncRequest(Writer writer, boolean shouldUseHSync, long txidWhenSync,CompletableFuture<Long> completableFuture) {this.writer = writer;this.shouldUseHSync = shouldUseHSync;this.sequenceWhenSync = txidWhenSync;this.completableFuture = completableFuture;}
}

这个对象包含了:

  • writer: 需要对其执行 sync 操作的 WAL Writer 实例。
  • shouldUseHSync: 布尔值,指示这次同步应该使用 hsync(保证数据和元数据落盘)还是 hflush(只保证数据到达 DataNode)。
  • sequenceWhenSync: 发起这次 sync 时,期望同步到的事务 ID。
  • completableFuture: 就是上一步创建的那个 future。这个 future 被一起打包,以便 SyncRunner 在完成 sync 操作后,知道该去完成哪个“凭证”。

这种将参数和回调机制(CompletableFuture)封装成一个对象的模式,是典型的命令模式(Command Pattern)的应用。

步骤 3: 将请求提交给 SyncRunner (offerSyncRequest)
this.offerSyncRequest(syncRequest);

这是最关键的一步,它将 SyncRequest 这个“命令”发送给后台的 SyncRunner 线程池去执行。我们递归分析 offerSyncRequest 方法:

private void offerSyncRequest(SyncRequest syncRequest) {for (int i = 0; i < this.syncRunners.length; i++) {this.syncRunnerIndex = (this.syncRunnerIndex + 1) % this.syncRunners.length;if (this.syncRunners[this.syncRunnerIndex].offer(syncRequest)) {return;}}syncRequest.completableFuture.completeExceptionally(new IOException("There is no available syncRunner."));
}
  • 轮询(Round-Robin)分发:
    • this.syncRunnerIndex = (this.syncRunnerIndex + 1) % this.syncRunners.length; 这行代码实现了一个简单的轮询调度算法。它确保了 sync 请求被均匀地分发到所有的 SyncRunner 线程上,避免了单个 SyncRunner 成为瓶颈。
  • 非阻塞提交 (offer):
    • this.syncRunners[this.syncRunnerIndex].offer(syncRequest): 这里调用的是 SyncRunner 内部阻塞队列的 offer 方法。offer 是一个非阻塞方法,如果队列已满,它会立即返回 false,而不会阻塞 consume() 线程。
  • 失败处理:
    • 如果 offer 成功,方法立即 return
    • 如果 for 循环遍历了所有的 SyncRunner,发现它们的请求队列都满了(这是一个极端的、系统高负载的标志),那么 offer 会一直失败。
    • 在这种情况下,FSHLog 会主动让这次 sync 请求失败:syncRequest.completableFuture.completeExceptionally(...)。这是一种保护机制,防止请求无限期积压。
步骤 4: 返回 Future
return future;

doWriterSync 方法本身立即返回。它返回的是第一步创建的那个 CompletableFuture。调用者 sync(writer) 拿到这个 future 后,会通过 addListener 为其注册回调。当远端的 SyncRunner 线程最终完成(或失败)这个 sync 请求时,它会调用 syncRequest.completableFuture.complete(...) 或 completeExceptionally(...),从而触发之前注册的回调,整个异步流程闭环。

对比 AsyncFSWAL

值得注意的是,FSHLog 的兄弟类 AsyncFSWAL 中 doWriterSync 的实现截然不同:

@Override
protected CompletableFuture<Long> doWriterSync(AsyncWriter writer, boolean shouldUseHsync,long txidWhenSyn) {return writer.sync(shouldUseHsync);
}

AsyncFSWAL 使用了基于 Netty 的异步 I/O Writer。它的 writer.sync() 方法本身就是非阻塞的,会直接返回一个 CompletableFuture。因此,它不需要像 FSHLog 那样维护一个复杂的 SyncRunner 线程池来模拟异步,而是直接利用了底层 I/O 库提供的异步能力。

总结

FSHLog.doWriterSync 方法是 FSHLog 实现其异步持久化策略的核心枢纽。它本身不执行任何耗时操作,而是扮演了一个任务分发者的角色:

  1. 封装命令: 将一个同步请求的所有上下文(Writerhsync标志、事务ID)和一个用于回调的 CompletableFuture 打包成一个 SyncRequest 对象。
  2. 异步提交: 通过轮询和非阻塞的 offer 方式,将这个命令高效、均衡地分发给后台的 SyncRunner 线程池。
  3. 立即返回凭证: 立即将 CompletableFuture 这个“凭证”返回给上层调用者,使得主消费线程可以继续工作,不被 I/O 操作阻塞。

这个设计完美地体现了生产者-消费者模式命令模式,将任务的“提交”和“执行”彻底解耦,是 FSHLog 能够在高并发下实现高性能写入的关键所在。

http://www.lryc.cn/news/614325.html

相关文章:

  • 光猫、路由器和交换机
  • DuoPlus支持导入文件批量配置云手机参数,还优化了批量操作和搜索功能!
  • 快速上手 Ollama:强大的开源语言模型框架
  • git如何使用和操作命令?
  • Lattice Radiant 下载ROM以及逻辑分析仪调试
  • 如何在 Ubuntu 24.04 LTS 或 22.04/20.04 上安装 Apache Maven
  • VS Code 快捷键快速插入带年月日时分秒的时间注释
  • OpenAI 最新开源模型 gpt-oss (Windows + Ollama/ubuntu)本地部署详细教程
  • 【Lua】XLua一键构建工具
  • react+echarts实现变化趋势缩略图
  • 我的c#用到Newtonsoft.Json.dll,Fleck.dll这两个dll能否打到一个exe 中,而不是一起随着exe拷贝
  • 无人机仿真环境搭建
  • 使用pytest对接口进行自动化测试
  • 微软XBOX游戏部门大裁员
  • QS菜单栏的安全与隐私Tile组件(GMS中的)加载逻辑
  • 使用 C# 通过 .NET 框架开发应用程序的安装与环境配置
  • Godot ------ 通过鼠标对节点进行操作
  • 僵尸进程、孤儿进程、进程优先级、/proc 文件系统、CRC 与网络溢出问题处理(实战 + 原理)
  • 强制用户更改WordPress密码的重要性及实现方法
  • Linux 内存管理之page folios
  • 电脑定时开关机终极指南
  • 静态路由主备切换
  • 2025产品经理接单经验分享与平台汇总
  • 腾讯云 CodeBuddy IDE:可以使用gpt5的ide
  • 操作系统-实验-进程
  • CVRF 是什么?微软弃用 MS 编号后,网络安全的下一个标准
  • 文件结构树的├、└、─ 符号
  • Java文件操作和IO
  • R语言代码加密(1)
  • 无人机航拍数据集|第9期 无人机风力电机表面损伤目标检测YOLO数据集2995张yolov11/yolov8/yolov5可训练