当前位置: 首页 > news >正文

KRaft 角色状态设计模式:从状态理解 Raft

这些状态类是 Raft 协议行为的核心载体。它们包含转移逻辑 和 节点在特定状态下的所有行为和数据。

QuorumState

它是 KRaft 客户端实现中状态管理的核心,扮演着“状态机上下文(Context)”和“状态转换协调者”的关键角色。

QuorumState 是整个 Raft 状态机的“大脑”和“协调中心”。它的核心职责是:

  • 维护当前状态: 持有一个 volatile EpochState state 字段,该字段引用了当前节点所处的具体状态对象(如 FollowerStateLeaderState 等)。
  • 执行状态转换: 提供一系列 transitionToXXX 方法(如 transitionToCandidatetransitionToFollower),这些方法负责创建新的状态对象并替换旧的,从而完成状态的切换。
  • 保证转换的有效性: 在执行状态转换前,会进行严格的检查,确保转换是合法的。例如,只有 CandidateState 才能转换到 LeaderState
  • 持久化选举状态: 关键的选举状态(如当前任期、投票给了谁)需要被持久化,以便节点重启后能恢复。QuorumState 负责调用 QuorumStateStore 来完成这一任务。
  • 提供统一视图: 向外层(KafkaRaftClient)提供一个统一的、稳定的接口来查询当前 Raft 集群的状态(如当前 Leader、任期、高水位等),而无需关心内部具体是哪个状态对象在工作。

从类的注释中可以清晰地看到它对所有可能的状态转换路径进行了详细的定义:

// ... existing code ...
/*** This class is responsible for managing the current state of this node and ensuring* only valid state transitions. Below we define the possible state transitions and* how they are triggered:** Resigned transitions to:*    Unattached:  After learning of a new election with a higher epoch, or expiration of the election timeout*    Follower:    After discovering a leader with a larger epoch** Unattached transitions to:*    Unattached:  After learning of a new election with a higher epoch or after giving a binding vote*    Prospective: After expiration of the election timeout*    Follower:    After discovering a leader with an equal or larger epoch** ... (and so on for all other states)*/
public class QuorumState {
// ... existing code ...

核心字段

QuorumState 聚合了 Raft 节点运行所需的各种上下文信息。

// ... existing code ...
public class QuorumState {private final OptionalInt localId;private final Uuid localDirectoryId;private final Time time;private final Logger log;private final QuorumStateStore store;private final KRaftControlRecordStateMachine partitionState;private final Endpoints localListeners;private final SupportedVersionRange localSupportedKRaftVersion;private final Random random;private final int electionTimeoutMs;private final int fetchTimeoutMs;
// ... existing code ...private volatile EpochState state;// ... existing code ...
}
  • state: 最核心的字段,一个 volatile 引用,指向当前的状态对象。volatile 保证了多线程间的可见性。
  • localIdlocalDirectoryId: 本地节点的唯一标识。OptionalInt 的使用表明节点可能作为无 ID 的观察者(Observer)运行。
  • storeQuorumStateStore 的实例,通常是 FileQuorumStateStore,负责将选举状态(ElectionState)读写到磁盘上的 quorum-state 文件中。
  • partitionStateKRaftControlRecordStateMachine 的实例,它负责管理 Voter 集合(VoterSet)的状态。QuorumState 需要从它这里获取最新的 Voter 信息。
  • electionTimeoutMsfetchTimeoutMs: 选举超时和 Fetch 超时的配置值,用于创建状态对象时传入。
  • timerandomlogContext: 时间、随机数生成器、日志上下文等工具类。

初始化逻辑 (initialize)

initialize 方法是 QuorumState 生命周期的起点,它负责在节点启动时,根据持久化的状态和日志状态,决定节点应该进入哪个初始状态。

// ... existing code ...public void initialize(OffsetAndEpoch logEndOffsetAndEpoch) throws IllegalStateException {// 1. 从 store 读取上次持久化的选举状态ElectionState election = readElectionState();final EpochState initialState;// ... 一系列复杂的 if-else if-else 判断 ...// 2. 根据 election state 和日志状态决定初始状态if (localId.isPresent() && election.isLeader(localId.getAsInt())) {// 如果上次是 Leader,则初始化为 ResignedStateinitialState = new ResignedState(...);} else if (localId.isPresent() &&election.isVotedCandidate(...)) {// 如果上次是 Candidate,则初始化为 CandidateStateinitialState = new CandidateState(...);} else if (election.hasLeader()) {// 如果知道 Leader,则初始化为 FollowerState (如果能找到 Leader 的地址)// 否则初始化为 UnattachedState// ...} else {// 其他情况,初始化为 UnattachedStateinitialState = new UnattachedState(...);}// 3. 完成初始状态的转换durableTransitionTo(initialState);}
// ... existing code ...

这个方法的逻辑非常严谨,覆盖了各种重启场景:

  • 如果重启前是 Leader,会进入 ResignedState。这是一种安全机制,可以防止它在同一个任期内为其他候选人投票,并确保日志的单调性。
  • 如果重启前是 Candidate,会重新进入 CandidateState
  • 如果重启前是 Follower,会尝试重新成为 Follower,但如果找不到 Leader 的网络地址,则会退化到 UnattachedState
  • 在大多数不确定的情况下,最安全的选择是进入 UnattachedState,等待集群的最新消息。

状态转换管理

QuorumState 提供了一系列 transitionToXXX 方法来驱动状态转换。这些方法是状态机运转的齿轮。

持久化转换 vs. 内存转换

QuorumState 定义了两种转换方式:

  1. durableTransitionTo(EpochState newState): 持久化转换。

    • 它首先调用 store.writeElectionState(...) 将新状态的选举信息(任期、投票给谁、Leader是谁)写入磁盘。
    • 然后调用 memoryTransitionTo(newState) 完成内存中状态对象的切换。
    • 适用场景: 用于那些会改变持久化选举状态的转换。例如,进入一个新的任期、投票给一个候选人、选举出新的 Leader。
  2. memoryTransitionTo(EpochState newState): 纯内存转换。

    • 它只在内存中用 newState 替换当前的 state 对象。
    • 适用场景: 用于那些不影响持久化选举状态的转换。例如,从 Leader 转换到 ResignedState,因为 ResignedState 是一个临时的软状态,重启后总会回到 ResignedState,所以无需持久化。
// ... existing code ...private void durableTransitionTo(EpochState newState) {log.info("Attempting durable transition to {} from {}", newState, state);store.writeElectionState(newState.election(), partitionState.lastKraftVersion());memoryTransitionTo(newState);}private void memoryTransitionTo(EpochState newState) {if (state != null) {state.close();}this.state = newState;log.info("Completed transition to {}", state);}
// ... existing code ...

作为状态上下文 (Context)

QuorumState 封装了内部状态的复杂性,对外提供了一致的查询接口。KafkaRaftClient 不需要知道当前是 FollowerState 还是 LeaderState,它只需要调用 QuorumState 的方法。

// ... existing code ...public int epoch() {return state.epoch(); // 委托给当前 state 对象}public OptionalInt leaderId() {ElectionState election = state.election(); // 委托给当前 state 对象if (election.hasLeader())return OptionalInt.of(state.election().leaderId());elsereturn OptionalInt.empty();}public boolean isLeader() {return state instanceof LeaderState; // 通过类型判断提供状态查询}public boolean isFollower() {return state instanceof FollowerState;}
// ... existing code ...

这些方法将具体的行为委托给了 this.state 对象,或者通过 instanceof 检查来回答关于当前状态的问题。这正是状态设计模式中上下文(Context)对象的典型实现。

总结

QuorumState 是 KRaft 状态管理的核心枢纽。它通过应用状态设计模式,将复杂的 Raft 协议逻辑清晰地分解到各个状态类中。

  • 它作为上下文(Context),聚合了所有必要的信息,并为上层提供了统一的交互接口。
  • 它作为协调者,严格管理着状态之间的转换,并通过与 QuorumStateStore 的交互,保证了选举状态的持久性和节点重启后的一致性。
  • 它的 initialize 方法是系统鲁棒性的重要体现,确保了节点在任何情况下都能以一个安全、一致的状态启动。

理解了 QuorumState 的设计,就等于掌握了 KRaft 客户端状态机如何组织和运转的蓝图。

EpochState接口

在 Kafka 的 Raft 实现中(位于 org.apache.kafka.raft 包下),EpochState 扮演着核心角色。Raft 协议将时间划分为一个个连续的任期(Term),在 Kafka 的实现中这被称为 Epoch。在任何一个给定的 Epoch 中,一个 Raft 节点都必然处于某一种状态,例如:领导者(Leader)、跟随者(Follower)或候选人(Candidate)。

EpochState 接口就是为了抽象和统一这些不同状态下的共同行为和属性。无论是 Leader、Follower 还是 Candidate,它们都共享一些基本特征:

  • 它们都属于某一个特定的 epoch
  • 它们都需要处理投票请求 (canGrantVote)。
  • 它们都需要能提供当前的选举状态 (election)。

通过定义这样一个接口,代码变得更加模块化和可扩展。Raft 的核心逻辑可以面向 EpochState 接口编程,而不用关心当前节点具体是哪一种状态,从而简化了状态切换和管理的复杂性。

EpochState 接口继承了 java.io.Closeable。这是一个非常重要的设计细节。

/** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements. See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with//... 代码省略 ...
package org.apache.kafka.raft;import java.io.Closeable;
import java.util.Optional;public interface EpochState extends Closeable {
//... 代码省略 ...

Closeable 接口只有一个方法:void close() throws IOException。一个类实现 Closeable 意味着它的实例管理着需要被显式关闭的资源,例如文件句柄、网络连接或定时器等。

为什么 EpochState 需要 Closeable?

不同的 Raft 状态可能需要管理不同的生命周期性资源。例如:

  • Follower/Candidate/ProspectiveState: 这些状态通常需要一个选举定时器 (electionTimer)。如果在一个超时时间内没有收到 Leader 的心跳,Follower 就会转变为 Candidate并发起选举。当状态切换时(比如从 Follower 切换到 Leader),旧状态的定时器就需要被取消或关闭,以防止不必要的选举和资源泄漏。
  • Leader: Leader 状态可能需要管理与所有 Follower 的心跳定时器。

通过继承 CloseableEpochState 的实现类可以被 try-with-resources 语句管理,这确保了当一个状态对象不再被使用时,它的 close() 方法会被自动调用,从而安全地释放其占有的资源。

例如,在 ResignedState.java 中,close() 方法是空的,因为它不管理需要释放的资源。

//... 代码省略 ...@Overridepublic void close() {}
}

但在其他状态(如 FollowerState 或 CandidateState)中,close() 方法会有关闭定时器等重要逻辑。

方法

//... 代码省略 ...
public interface EpochState extends Closeable {default Optional<LogOffsetMetadata> highWatermark() {return Optional.empty();}/*** Decide whether to grant a vote to a replica.** It is the responsibility of the caller to invoke* {@link QuorumState#unattachedAddVotedState(int, ReplicaKey)} if a standard vote is granted.** @param replicaKey the id and directory of the replica requesting the vote* @param isLogUpToDate whether the replica's log is at least as up-to-date as receiver’s log* @param isPreVote whether the vote request is a PreVote (non-binding) or standard vote* @return true if it can grant the vote, false otherwise*/boolean canGrantVote(ReplicaKey replicaKey, boolean isLogUpToDate, boolean isPreVote);/*** Get the current election state, which is guaranteed to be immutable.*/ElectionState election();/*** Get the current (immutable) epoch.*/int epoch();/*** Returns the known endpoints for the leader.** If the leader is not known then {@code Endpoints.empty()} is returned.*/Endpoints leaderEndpoints();/*** User-friendly description of the state*/String name();
}
  • default Optional<LogOffsetMetadata> highWatermark()

    • 作用: 获取当前的高水位(High Watermark)。高水位是 Raft 协议中一个至关重要的概念,它代表了已经被集群中大多数节点确认(committed)的日志条目的最高偏移量。所有低于高水位的日志都可以被安全地应用到状态机。
    • 设计: 这是一个 default 方法,默认返回一个空的 Optional。这意味着不是所有状态都必须直接提供高水位信息。通常,只有 Leader 状态会主动计算和维护高水位。Follower 状态的高水位是从 Leader 的心跳消息中更新的。
  • boolean canGrantVote(ReplicaKey replicaKey, boolean isLogUpToDate, boolean isPreVote)

    • 作用: 这是 Raft 领导者选举的核心逻辑之一。当一个节点收到另一个节点(候选人)的投票请求时,它会调用此方法来决定是否将自己的选票投给该候选人。
    • 参数:
      • replicaKey: 请求投票的候选人的唯一标识。
      • isLogUpToDate: Raft 的一个安全机制,确保选票只会投给那些日志记录至少和当前节点一样新的候选人。
      • isPreVote: 区分“预投票”和“正式投票”。预投票是一种优化,用于在不增加 epoch 的情况下探测一个节点是否能赢得选举,从而避免网络分区恢复时产生不必要的选举干扰。
    • 返回值true 表示同意投票,false 表示拒绝。
  • ElectionState election()

    • 作用: 获取当前(不可变的)选举状态。ElectionState 对象封装了关于当前选举周期的详细信息,例如:当前 epoch、已知的 Leader 是谁、本节点投票给了谁、以及所有投票者的集合。
    • 设计: 返回一个不可变对象是并发编程中的一种良好实践,它保证了状态信息在被外部读取时不会被意外修改,增强了线程安全性。
  • int epoch()

    • 作用: 获取当前状态所属的任期号(epoch)。
    • 重要性: Epoch 在 Raft 中扮演逻辑时钟的角色。它单调递增,用于识别和拒绝来自旧任期的过时消息,是保证协议一致性的基础。
  • Endpoints leaderEndpoints()

    • 作用: 返回当前已知的 Leader 节点的网络地址信息(IP和端口)。
    • 用途: 使得 Follower 节点和客户端能够知道将请求发送到哪里。如果当前 Leader 未知(例如,在选举期间),则会返回一个空的 Endpoints 对象。
  • String name()

    • 作用: 返回一个用户友好的状态名称字符串,例如 "Follower", "Candidate", "Leader", "Resigned" 等。
    • 用途: 主要用于日志记录、监控(Metrics)和调试,方便开发和运维人员快速了解节点当前所处的状态。

总结

EpochState 接口是 Kafka Raft 库中一个设计精良的核心抽象。它通过定义一组通用的行为和属性,成功地统一了 Raft 协议中各种复杂的状态。

  • 封装性与模块化: 它将不同状态的共性行为(如投票、提供 epoch 信息)封装起来,使得上层逻辑可以面向接口编程,降低了代码的耦合度。
  • 资源管理: 通过继承 Closeable,它强制实现类考虑资源释放问题,利用 try-with-resources 机制保证了如定时器等资源的安全性,避免了资源泄漏。
  • 不变性与线程安全: 像 election() 方法返回不可变对象的设计,体现了对并发环境下状态一致性和线程安全的重视。

总而言之,EpochState 为构建一个健壮、可维护、易于理解的 Raft 实现奠定了坚实的基础。

ElectionState 

// ... existing code ...
/*** Encapsulate election state stored on disk after every state change.*/
public final class ElectionState {
// ... existing code ...

正如类注释所说,ElectionState 的核心作用是封装每次状态变更后需要存储在磁盘上的选举状态

  • public final class ElectionState:
    • final: 意味着这个类不能被继承。这通常用于创建不可变 (Immutable) 对象。一旦 ElectionState 对象被创建,它的内部状态(如 epoch、leaderId 等)就不能再被修改。这在并发环境中是非常重要的,因为它天然是线程安全的,可以被自由共享而无需担心数据被意外篡改。

Raft 协议要求节点必须持久化一些关键状态,以便在节点重启后能够恢复,并保证协议的安全性。ElectionState 正是这些需要持久化的核心选举数据的在内存中的体现。

ElectionState 封装了 Raft 选举的四个关键信息:

// ... existing code ...
public final class ElectionState {private static final int UNKNOWN_LEADER_ID = -1;private static final int NOT_VOTED = -1;private final int epoch;private final OptionalInt leaderId;private final Optional<ReplicaKey> votedKey;// This is deprecated. It is only used when writing version 0 of the quorum state fileprivate final Set<Integer> voters;
// ... existing code ...
  • epochint 类型,代表 Raft 的任期号。这是 Raft 协议的逻辑时钟,用于识别过时的请求和保证一致性。
  • leaderIdOptionalInt 类型,代表当前任期的 Leader 节点 ID。它是一个 Optional,因为在选举期间可能还没有选出 Leader。
  • votedKeyOptional<ReplicaKey> 类型,代表在当前任期中,本节点投票给了哪个候选人。ReplicaKey 不仅包含节点 ID,还包含一个目录 ID (directoryId),这是为了支持 JBOD (Just a Bunch Of Disks) 架构,确保在同一台物理机上运行的多个 Kraft 节点(使用不同磁盘)能被唯一标识。
  • votersSet<Integer> 类型,代表当前参与投票的节点集合。注释明确指出这个字段已被弃用,仅为了兼容旧版本(version 0)的 quorum-state 文件格式而保留。新版本中,Voter Set 的管理已经和 ElectionState 分离。

对象的创建方式 (静态工厂方法)

ElectionState 的构造函数是包级私有的,外部无法直接 new。它通过一系列静态工厂方法来创建实例,这种方式使得代码更具可读性,因为方法名清晰地描述了所创建对象的状态。

  • ElectionState.withVotedCandidate(int epoch, ReplicaKey votedKey, Set<Integer> voters)

    • 创建一个表示已投票给某个候选人尚未选出 Leader 的状态。此时 leaderId 为空。
    • 例如,当一个节点作为 Candidate 启动并首先给自己投票时,就会处于这个状态。
  • ElectionState.withElectedLeader(int epoch, int leaderId, Optional<ReplicaKey> votedKey, Set<Integer> voters)

    • 创建一个表示已成功选举出 Leader 的状态。
    • 例如,当一个 Follower 收到 Leader 的心跳时,或一个 Candidate 赢得选举时,就会进入这个状态。
  • ElectionState.withUnknownLeader(int epoch, Set<Integer> voters)

    • 创建一个表示Leader 未知且尚未投票的状态。
    • 例如,当一个 Follower 的选举计时器超时,进入新一轮选举的初始阶段时,就会是这个状态。
  • ElectionState.fromQuorumStateData(QuorumStateData data)

    • 这是反序列化的入口,从一个 QuorumStateData 对象(从磁盘读取的数据结构)恢复成一个 ElectionState 内存对象。

序列化与持久化

ElectionState 是内存中的对象,它需要被转换成可持久化的格式。toQuorumStateData 方法就承担了这个责任。

// ... existing code ...public QuorumStateData toQuorumStateData(short version) {QuorumStateData data = new QuorumStateData().setLeaderEpoch(epoch).setLeaderId(leaderIdOrSentinel()).setVotedId(votedKey.map(ReplicaKey::id).orElse(NOT_VOTED));if (version == 0) {List<QuorumStateData.Voter> dataVoters = voters.stream().map(voterId -> new QuorumStateData.Voter().setVoterId(voterId)).collect(Collectors.toList());data.setCurrentVoters(dataVoters);} else if (version == 1) {data.setVotedDirectoryId(votedKey.flatMap(ReplicaKey::directoryId).orElse(ReplicaKey.NO_DIRECTORY_ID));} else {
// ... existing code ...}return data;}
// ... existing code ...

这段代码清晰地展示了版本兼容性处理:

  • 对于 version 0: 它会序列化 voters 列表,但不会序列化 votedDirectoryId
  • 对于 version 1: 它不再序列化 voters 列表,转而序列化 votedDirectoryId

这种设计使得 KRaft 协议可以在不停机的情况下进行升级和演进。

关键方法分析

  • isVotedCandidate(ReplicaKey nodeKey)

    // ... existing code ...
    public boolean isVotedCandidate(ReplicaKey nodeKey) {if (nodeKey.id() < 0) {throw new IllegalArgumentException("Invalid node key " + nodeKey);} else if (votedKey.isEmpty()) {return false;} else if (votedKey.get().id() != nodeKey.id()) {return false;} else if (votedKey.get().directoryId().isEmpty()) {// when the persisted voted directory id is not present assume that we voted for this candidate;// this happens when the kraft version is 0.return true;}return votedKey.get().directoryId().equals(nodeKey.directoryId());
    }
    // ... existing code ...
    

    这个方法用于检查当前节点是否投票给了 nodeKey 所代表的候选人。它的逻辑同样体现了向后兼容:如果持久化的 votedKey 中没有 directoryId(这发生在从 version 0 的状态文件恢复时),它会默认匹配成功,只比较 id。否则,它会同时比较 id 和 directoryId

  • leaderIdOrSentinel()

    // ... existing code ...
    public int leaderIdOrSentinel() {return leaderId.orElse(UNKNOWN_LEADER_ID);
    }
    // ... existing code ...
    

    这个方法提供了一种安全的获取 leaderId 的方式。如果 Leader 不存在,它不会抛出异常,而是返回一个哨兵值 -1。这在序列化到 QuorumStateData 时非常有用,因为协议消息格式通常要求一个整数字段而不是 Optional

在系统中的角色

ElectionState 并不是孤立存在的,它与 EpochState 的实现类(如 CandidateStateFollowerState 等)和 QuorumState 紧密协作。

  • EpochState -> ElectionState: 各种代表 Raft 节点当前状态的 EpochState 实现,在其 election() 方法中,会根据自身状态(如自己是不是 Leader,投票给了谁)构建一个对应的 ElectionState 对象。

    例如,在 CandidateState 中:

    // ... existing code ...
    @Override
    public ElectionState election() {return ElectionState.withVotedCandidate(epoch,ReplicaKey.of(localId, localDirectoryId),epochElection.voterIds());
    }
    // ... existing code ...
    

    CandidateState 总是认为自己是候选人,并且已经给自己投了票,所以它创建了一个 withVotedCandidate 的 ElectionState

  • QuorumState 使用 ElectionStateQuorumState 是管理 Raft 状态切换和持久化的核心类。当 Raft 状态发生改变时(例如,从 Follower 变成 Candidate),QuorumState 会从新的 EpochState 对象获取 ElectionState,然后调用 QuorumStateStore 将其写入磁盘,完成状态的持久化。

    可以在很多测试用例中看到这个流程,比如:

    // ... existing code ...
    store.writeElectionState(ElectionState.withElectedLeader(logEndEpoch, leader.id(), Optional.empty(), voters.voterIds()),kraftVersion
    );
    // ... existing code ...
    

总结

ElectionState 是 Kafka Raft 实现中一个设计精巧且至关重要的类。它是一个不可变的、代表持久化选举状态的值对象

  1. 封装核心状态: 它精确地封装了 Raft 选举所需的最少但关键的信息:epochleaderIdvotedKey
  2. 保证协议安全: 通过持久化这些状态,确保了节点在崩溃重启后不会违反 Raft 的安全原则(例如,在同一个任期内投票给多个候选人)。
  3. 促进代码清晰: 使用静态工厂方法和不可变性,使得代码更易于理解和维护,并保证了线程安全。
  4. 支持协议演进: 通过版本化的序列化/反序列化逻辑,支持了 KRaft 协议的平滑升级。

FollowerState 

public class FollowerState implements EpochState 定义了一个实现了 EpochState 接口的类。在 Raft 协议中,一个节点绝大多数时间都处于 Follower 状态。

Follower 的核心职责是

  • 被动地接收并复制来自 Leader 的日志条目。
  • 响应 Leader 的心跳请求,以表明自己还存活。
  • 如果在一个“选举超时”周期内没有收到 Leader 的任何消息,则认为 Leader 已失效,并将自己的状态转换为 Candidate,发起新一轮选举。

FollowerState 类就封装了作为 Follower 时的所有状态数据和行为逻辑。

// ... existing code ...
public class FollowerState implements EpochState {private final Logger log;private final int fetchTimeoutMs;private final int epoch;private final int leaderId;private final Endpoints leaderEndpoints;private final Optional<ReplicaKey> votedKey;private final Set<Integer> voters;// Used for tracking the expiration of both the Fetch and FetchSnapshot requestsprivate final Timer fetchTimer;// Used to track when to send another update voter requestprivate final Timer updateVoterPeriodTimer;/* Used to track if the replica has fetched successfully from the leader at least once since* the transition to follower in this epoch. If the replica has not yet fetched successfully,* it may be able to grant PreVotes.*/private boolean hasFetchedFromLeader = false;private Optional<LogOffsetMetadata> highWatermark;/* For kraft.version 0, track if the leader has received updated voter information from this* follower.*/private boolean hasUpdatedLeader = false;/* Used to track the currently fetching snapshot. When fetching snapshot regular Fetch request* are paused*/private Optional<RawSnapshotWriter> fetchingSnapshot = Optional.empty();
// ... existing code ...
  • epochleaderIdleaderEndpointsvotedKeyvoters: 这些是基本的选举信息,定义了当前任期、公认的 Leader 以及投票状态。
  • fetchTimer: 这是一个至关重要的选举定时器。Follower 每次收到 Leader 的有效消息(如 Fetch 或 FetchSnapshot 请求)时,都会重置这个定时器。如果定时器超时,就意味着与 Leader "失联"。
  • hasFetchedFromLeader: 一个布尔标记,用于记录在当前任期内,是否已经成功地从 Leader 获取过数据。这个标记在处理预投票(Pre-Vote)时有特殊作用。
  • highWatermark: Follower 所知的、已被集群提交的日志的最高位移。它由 Leader 在心跳消息中告知 Follower。
  • fetchingSnapshot: 如果 Follower 的日志落后 Leader 太多,它会通过接收快照来快速追赶。这个字段就用于管理正在接收的快照的状态。
  • updateVoterPeriodTimerhasUpdatedLeader: 这两个字段主要用于兼容旧版本的 KRaft 协议,处理 Voter 集合的更新逻辑。

心跳与选举超时 (fetchTimer)

这是 Follower 状态的核心机制。

  • hasFetchTimeoutExpired(long currentTimeMs): 检查 fetchTimer 是否已经超时。如果返回 true,外部逻辑(QuorumState)就会将节点状态从 Follower 切换到 Candidate,并发起选举。
  • resetFetchTimeoutForSuccessfulFetch(long currentTimeMs): 当 Follower 成功收到 Leader 的消息后,会调用此方法。它会重置 fetchTimer,并设置 hasFetchedFromLeader = true。这相当于一次“心跳续约”。
// ... existing code ...public boolean hasFetchTimeoutExpired(long currentTimeMs) {fetchTimer.update(currentTimeMs);return fetchTimer.isExpired();}public void resetFetchTimeoutForSuccessfulFetch(long currentTimeMs) {fetchTimer.update(currentTimeMs);fetchTimer.reset(fetchTimeoutMs);hasFetchedFromLeader = true;}
// ... existing code ...

投票逻辑 (canGrantVote)

Follower 的投票逻辑非常严格,因为它已经承认了一个 Leader。

// ... existing code ...@Overridepublic boolean canGrantVote(ReplicaKey replicaKey, boolean isLogUpToDate, boolean isPreVote) {if (isPreVote && !hasFetchedFromLeader && isLogUpToDate) {return true;}log.debug("Rejecting Vote request (preVote={}) from replica ({}) since we are in FollowerState with leader {} in " +"epoch {}, hasFetchedFromLeader={}, replica's log is up-to-date={}",isPreVote,replicaKey,leaderId,epoch,hasFetchedFromLeader,isLogUpToDate);return false;}
// ... existing code ...
  • 通常情况: 对于任何标准的投票请求 (isPreVote = false),Follower 都会拒绝
  • 特殊情况 (Pre-Vote): Follower 只有在满足以下所有条件时,才会同意一个预投票请求:
    1. 这是一个预投票请求 (isPreVote = true)。
    2. 在当前任期内,尚未成功从 Leader 处获取过任何数据 (!hasFetchedFromLeader)。
    3. 请求者的日志至少和自己一样新 (isLogUpToDate = true)。

这个设计的目的是处理一种特殊场景:一个网络分区后刚刚恢复的节点,它可能还处于 Follower 状态,但它的 Leader 实际上已经失效了。通过这个机制,它可以响应其他节点的预投票,从而帮助集群在不增加任期号(epoch)的情况下,确认是否可以开始一次有效的选举。一旦它成功从 Leader 获取数据 (hasFetchedFromLeader 变为 true),它就会坚定地跟随当前 Leader,拒绝所有投票请求。

isPreVote:什么是预投票 (Pre-Vote)?

预投票是 Raft 协议的一个重要优化。

  • 问题场景: 想象一个节点因为网络问题被隔离了。在它被隔离期间,集群的其他节点选举出了新的 Leader,任期号(epoch)也增加了。当这个被隔离的节点网络恢复后,它的选举计时器会超时,然后它会立即增加自己的任期号并发起一次正式选举
  • 带来的麻烦: 这次选举是注定要失败的,因为它的日志是落后的。但它的高任期号投票请求会传播到集群,导致当前的合法 Leader 收到更高任期号的请求后,降级为 Follower,从而引发一次不必要的集群抖动和短暂的不可用。
  • 预投票的解决方案: 节点在发起正式选举前,先发起一轮预投票。预投票不会增加任期号,它只是询问其他节点:“如果我发起选举,你们会投给我吗?”。只有在获得大多数节点的预投票同意后,它才会真正增加任期号并发起正式选举。

这样,上面场景中的那个恢复节点在发起预投票时,就会被其他节点拒绝,因为它无法获得多数票,也就不会发起那次具有破坏性的正式选举了。

!hasFetchedFromLeader:为什么关心“是否已从 Leader 获取过数据”?

hasFetchedFromLeader 是一个布尔标记,它在 FollowerState 被创建时为 false。只有当这个 Follower 成功地从当前任期的 Leader 那里收到了心跳(Fetch 或 FetchSnapshot 请求)后,这个标记才会被设置为 true

// ... existing code ...public void resetFetchTimeoutForSuccessfulFetch(long currentTimeMs) {fetchTimer.update(currentTimeMs);fetchTimer.reset(fetchTimeoutMs);hasFetchedFromLeader = true;}
// ... existing code ...

这个标记代表了 Follower 对当前 Leader 的“信任程度”。

  • !hasFetchedFromLeader (false): “我知道 leaderId 是谁,但我还没收到过它的消息。我不确定它是否真的存活并且能联系到我。”
  • hasFetchedFromLeader (true): “我刚刚还跟 Leader 通过信,我很确定它活得好好的。”

那么hasFetchedFromLeader在什么情况下会回到 false 呢?答案是:当 Raft 节点的状态发生改变,导致需要创建一个新的 FollowerState 对象时。

FollowerState 实现了 EpochState 接口,它的生命周期与一个特定的 任期(Epoch) 紧密绑定。当任期发生变化时,旧的 EpochState 对象(无论是 FollowerState, CandidateState 还是 LeaderState)会被丢弃,并根据新的情况创建一个全新的 EpochState 对象。

考虑以下几种场景:

  • 选举超时: 当前的 FollowerState 的选举计时器超时了。节点会转换到 CandidateState,这个 FollowerState 对象就被废弃了。如果选举失败,又发现了一个新的 Leader,那么系统会创建一个新的 FollowerState 对象来跟随这个新 Leader。在这个新创建的对象里,hasFetchedFromLeader 重新被初始化为 false。
  • 发现更高任期的 Leader: Follower 收到一个来自更高任期(epoch)的 Leader 的消息。它会立即放弃当前的 FollowerState,并为这个新的、更高的任期创建一个新的 FollowerState 对象。同样,在这个新对象里,hasFetchedFromLeader 也是 false。

isLogUpToDate:Raft 的基本安全原则

这是 Raft 协议的基础。一个节点只会把票投给日志记录至少和自己一样“新”的候选人,以确保不会丢失任何已提交的数据。

综合分析:三个条件组合的智慧

现在我们把这三个条件组合起来看 if (isPreVote && !hasFetchedFromLeader && isLogUpToDate)

这个判断覆盖了一种非常特殊但重要的边界情况: 一个节点刚刚进入 Follower 状态,它知道 Leader 是谁,但在它收到来自这个 Leader 的第一次心跳之前的这个短暂窗口期,它其实并不完全确定这个 Leader 的有效性。

  • 如果 hasFetchedFromLeader 是 true: 这意味着 Follower 已经和 Leader 建立了稳定的通信。它坚信 Leader 是存活的,因此它会拒绝所有的投票/预投票请求,忠诚地跟随当前 Leader。这可以从下面的测试用例中得到验证,一旦调用了 resetFetchTimeoutForSuccessfulFetchcanGrantVote 就会一直返回 false

    // ... existing code ...
    @ParameterizedTest
    @ValueSource(booleans = {true, false})
    public void testPreVoteAfterSuccessfulFetchFromLeader(boolean isLogUpToDate) {FollowerState state = newFollowerState(Set.of(1, 2, 3));state.resetFetchTimeoutForSuccessfulFetch(time.milliseconds());assertFalse(state.canGrantVote(ReplicaKey.of(1, ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate, true));
    // ... existing code ...
    }
    
  • 如果 hasFetchedFromLeader 是 false: 在这个时间窗口内,如果收到了一个预投票请求,并且对方的日志是更新的,那么 Follower 会想:“我现在的 Leader 还没联系过我,可能它刚当选就挂了。这个发预投票的候选人看起来条件不错,我先同意它的‘意向投票’也无妨,这不会改变我的任期,很安全。”

总结一下

这个逻辑的核心是在不破坏 Raft 安全性的前提下,尽可能地提高系统的活性(Liveness)和稳定性

  1. 对于标准投票 (isPreVote = false): Follower 永远拒绝,因为它已经认定了本任期的 Leader。
  2. 对于预投票 (isPreVote = true):
    • 如果已经和 Leader 稳定通信 (hasFetchedFromLeader = true),则拒绝预投票,以维护当前 Leader 的稳定性。
    • 如果还未和 Leader 建立通信 (hasFetchedFromLeader = false),则同意预投票(前提是日志最新),给可能出现问题的集群(如 Leader 选出后立即宕机)一个快速恢复的机会。

这是一个非常优雅的权衡,确保了 Follower 既不会轻易地被一个过时的节点干扰,也不会在一个真正需要新选举的场景下“固执己见”。

高水位更新 (updateHighWatermark)

Follower 从 Leader 的消息中获取高水位信息,并用此方法更新本地状态。

// ... existing code ...public boolean updateHighWatermark(OptionalLong newHighWatermark) {
// ...if (highWatermark.isPresent()) {
// ...} else if (previousHighWatermark > updatedHighWatermark) {throw new IllegalArgumentException(String.format("Non-monotonic update of high watermark from %d to %d",previousHighWatermark,updatedHighWatermark));}
// ...}
// ...return true;}
// ... existing code ...

此方法强制要求高水位的更新必须是单调递增的。如果尝试用一个更小的值来更新高水位,会直接抛出异常。这是 Raft 协议保证数据一致性的一个基本安全要求。

快照同步 (fetchingSnapshot)

当需要通过快照来同步数据时,FollowerState 会管理这个过程。

// ... existing code ...public void setFetchingSnapshot(Optional<RawSnapshotWriter> newSnapshot) {fetchingSnapshot.ifPresent(RawSnapshotWriter::close);fetchingSnapshot = newSnapshot;}@Overridepublic void close() {fetchingSnapshot.ifPresent(RawSnapshotWriter::close);}
// ... existing code ...

setFetchingSnapshot 方法用于开始或更新一个快照接收过程,并确保旧的、未完成的快照写入器被关闭。close() 方法则确保当 FollowerState 对象被销毁时(例如,状态切换),任何进行中的快照写入器都能被正确关闭,以释放文件句柄等资源。

接口实现 (EpochState)

FollowerState 实现了 EpochState 接口的所有方法:

  • name(): 返回固定的字符串 "Follower"。
  • epoch(): 返回当前任期号 epoch
  • leaderEndpoints(): 返回已知的 Leader 的网络地址。
  • election(): 返回一个 ElectionState 对象,该对象明确指出 Leader 是谁。
    // ... existing code ...
    @Override
    public ElectionState election() {return ElectionState.withElectedLeader(epoch, leaderId, votedKey, voters);
    }
    // ... existing code ...
    
  • highWatermark(): 返回当前已知的高水位。
  • canGrantVote(): 实现了 Follower 特有的投票逻辑(如上文分析)。
  • close(): 实现了资源清理逻辑(如上文分析)。

总结

FollowerState 是 KRaft 中对 Raft Follower 角色的精确建模。它不仅仅是一个简单的数据容器,而是一个包含了复杂状态和逻辑的能动对象。

  • 核心职责: 它的核心是选举定时器 (fetchTimer),驱动了 Raft 的活性(liveness)—— 确保当 Leader 失效时,集群能够及时发起新的选举。
  • 保证安全: 它通过严格的投票逻辑 (canGrantVote) 和高水位单调性检查 (updateHighWatermark),保证了 Raft 协议的安全性(safety)—— 不会选出错误的 Leader,也不会提交未被确认的数据。
  • 状态封装: 它良好地封装了作为 Follower 所需的所有信息和行为,使得上层的状态机(QuorumState)可以清晰地进行状态转换和管理。

CandidateState

它是在 KRaft (Kafka Raft) 协议中代表**候选人(Candidate)**角色的核心实现,是整个选举过程的发起者和驱动者。

public class CandidateState implements NomineeState {
//...
}

CandidateState 实现了 NomineeState 接口,表明它是一个“提名”状态,即正在争取成为 Leader 的状态(另一个实现是 ProspectiveState)。

在 Raft 协议中,当一个 Follower 的选举计时器超时,或者一个节点刚启动时,它就会转变为 Candidate 状态。

Candidate 的核心职责是

  1. 增加任期号(epoch)。
  2. 给自己投一票。
  3. 向集群中所有其他 Voter 发送投票请求(VoteRequest)。
  4. 等待并处理其他节点的响应,直到以下三种情况之一发生:
    • 赢得选举: 获得超过半数节点的投票,成为 Leader。
    • 选举失败: 收到来自更高任期的 Leader 的消息,转变为 Follower。
    • 选举超时: 在一轮选举中票数被瓜分,没有任何节点获得多数票,选举超时后开始新一轮选举。

CandidateState 类就封装了作为 Candidate 时的所有状态数据和行为逻辑。

// ... existing code ...
public class CandidateState implements NomineeState {private final int localId;private final Uuid localDirectoryId;private final int epoch;private final EpochElection epochElection;private final Optional<LogOffsetMetadata> highWatermark;private final int electionTimeoutMs;private final Timer electionTimer;private final Logger log;
// ... existing code ...
  • localIdlocalDirectoryId: 标识当前节点自身的 ID。
  • epoch: 当前的任期号。Candidate 状态总是与一个新增加的任期号相关联。
  • epochElection: 这是一个至关重要的辅助类,专门用于跟踪本轮选举的票数。它内部记录了哪些节点投了赞成票,哪些投了反对票,并能判断是否已获得多数票(isVoteGranted())或选举是否已失败(isVoteRejected())。
  • electionTimer选举计时器。Candidate 会在 electionTimeoutMs 时间内等待选举结果。如果计时器超时,本轮选举就失败了。
  • highWatermark: 候选人所知的、已被提交的日志的最高位移。这个信息会包含在投票请求中,用于让其他节点判断该候选人的日志是否足够新。

构造与初始化

CandidateState 的构造函数揭示了它被创建时的关键动作:

// ... existing code ...protected CandidateState(Time time,int localId,Uuid localDirectoryId,int epoch,VoterSet voters,Optional<LogOffsetMetadata> highWatermark,int electionTimeoutMs,LogContext logContext) {
// ... existing code ...this.electionTimer = time.timer(electionTimeoutMs);
// ... existing code ...this.epochElection = new EpochElection(voters.voterKeys());epochElection.recordVote(localId, true);}
// ... existing code ...
  1. 启动选举计时器this.electionTimer = time.timer(electionTimeoutMs);
  2. 初始化计票器this.epochElection = new EpochElection(voters.voterKeys());
  3. 给自己投票epochElection.recordVote(localId, true);

这完美地复现了 Raft 协议的规定:一旦成为候选人,立即开始计时,并首先给自己投一票。

投票管理 (recordGrantedVoterecordRejectedVote)

这两个方法用于记录从其他节点收到的投票结果。

// ... existing code ...@Overridepublic boolean recordGrantedVote(int remoteNodeId) {if (epochElection().isRejectedVoter(remoteNodeId)) {throw new IllegalArgumentException("Attempt to grant vote from node " + remoteNodeId +" which previously rejected our request");}return epochElection().recordVote(remoteNodeId, true);}@Overridepublic boolean recordRejectedVote(int remoteNodeId) {if (epochElection().isGrantedVoter(remoteNodeId)) {throw new IllegalArgumentException("Attempt to reject vote from node " + remoteNodeId +" which previously granted our request");}return epochElection().recordVote(remoteNodeId, false);}
// ... existing code ...

它们通过调用 epochElection 来更新票数。同时,它们包含健壮性检查,防止一个节点先投了赞成票又投反对票(或反之),确保投票的不可撤销性。

选举结果判断 (epochElection)

CandidateState 本身不直接判断选举结果,而是将这个任务委托给 epochElection 对象。外部的状态机(如 QuorumState)会通过调用 candidateState.epochElection().isVoteGranted() 来检查是否赢得了选举。

// ... existing code ...CandidateState candidateState = candidateStateOrThrow();if (!candidateState.epochElection().isVoteGranted())throw new IllegalStateException("Cannot become leader without majority votes granted");// ... transition to LeaderState ...LeaderState<T> state = new LeaderState<>(
// ... existing code ...

如果 isVoteGranted() 返回 trueQuorumState 就会将状态转换为 LeaderState

选举超时 (hasElectionTimeoutExpired)

这个方法检查 electionTimer 是否超时。如果超时,外部状态机将把状态从 Candidate 转换到 Prospective,然后通常会经过一个随机退避(backoff)时间后,再重新发起新一轮选举。

响应他人投票请求 (canGrantVote)

作为 Candidate,它已经把票投给了自己。那么它如何回应其他候选人的投票请求呢?

// ... existing code ...@Overridepublic boolean canGrantVote(ReplicaKey replicaKey,boolean isLogUpToDate,boolean isPreVote) {if (isPreVote && isLogUpToDate) {return true;}// Reject standard vote requests even if replicaId = localId, although the replica votes for// itself, this vote is implicit and not "granted".log.debug(
// ... existing code ...);return false;}
// ... existing code ...
  • 对于标准投票 (isPreVote = false)一律拒绝。因为在同一个任期 epoch 内,一个节点只能投一票,而它已经投给了自己。
  • 对于预投票 (isPreVote = true): 如果请求者的日志至少和自己一样新 (isLogUpToDate),则可以同意。这是因为预投票不改变任期号,也不会改变自己的投票承诺。同意预投票是一种合作行为,有助于集群更快地发现并选举出最合适的 Leader,避免因竞争导致选举超时。

状态转换与生命周期

类顶部的注释清晰地描述了 CandidateState 的生命周期:

  1. 开始: 发送投票请求,并记录响应。
  2. 选举成功: 如果获得多数赞成票 (epochElection.isVoteGranted()),则转换到 LeaderState
  3. 选举失败: 如果获得多数反对票 (epochElection.isVoteRejected()),则转换到 ProspectiveState,并进入退避阶段。
  4. 选举超时: 如果计时器超时 (electionTimer.isExpired()),则立即转换到 ProspectiveState

总结

CandidateState 是 KRaft 选举机制的核心驱动力。它是一个主动、有明确目标的临时状态。

  • 主动性: 与被动的 FollowerState 不同,CandidateState 主动发起选举,推动集群状态向前演进。
  • 封装性: 它良好地封装了作为候选人所需的所有逻辑,包括给自己投票、管理选举计时器、记录票数和决定如何响应其他投票请求。
  • 安全性: 它严格遵守 Raft 的投票规则(一任期一票),并通过 canGrantVote 的逻辑确保不会破坏选举的安全性。
  • 协作性: 通过对预投票的特殊处理,它又表现出一定的协作性,有助于提高选举效率。

理解了 CandidateState,就等于理解了 KRaft 集群在 Leader 失效后是如何发起自愈过程并选举出新领导者的。

EpochElection 

专门用于在 KRaft 选举期间为某个特定的任期(Epoch)跟踪和计算选票。它被 CandidateState 和 ProspectiveState 用来管理选举过程。

/***  Tracks the votes cast by voters in an election held by a Nominee.*/
public class EpochElection {
//...
}

EpochElection 的定位是一个计票器。在 Raft 协议中,当一个节点成为候选人(Candidate)并发起选举时,它需要:

  1. 知道总共有哪些投票人(Voters)。
  2. 记录每个投票人是投了赞成票、反对票,还是尚未投票。
  3. 实时判断自己是否已经获得了超过半数的赞成票(选举成功)。
  4. 实时判断自己是否已经收到了足够多的反对票,以至于不可能再获胜(选举失败)。

EpochElection 类就是为了解决这些问题而设计的。它将计票的复杂逻辑从 CandidateState 中剥离出来,使得 CandidateState 可以更专注于自身的状态转换,遵循了单一职责原则

在 EpochElection 这个类的上下文中,所有的“赞成”(granted)和“反对”(rejected)都是相对于发起这次选举的那个候选人(Candidate)而言的

让我们把这个概念放在 Raft 协议的流程中来理解:

  1. 选举开始: 节点 A 因为选举超时,决定成为候选人(Candidate)。它会创建一个 CandidateState 对象。

  2. 创建计票器: 在 CandidateState 的构造函数中,会立即创建一个 EpochElection 对象。这个 EpochElection 对象是专属于节点 A 在当前这个任期(Epoch)的选举的。

    // ... existing code ...
    protected CandidateState(// ...
    ) {// ...this.epochElection = new EpochElection(voters.voterKeys());// 候选人首先给自己投一票赞成票epochElection.recordVote(localId, true);
    }
    // ... existing code ...
    
  3. 发送投票请求: 节点 A 会向集群中其他所有投票人(Voters)发送 VoteRequest 消息,请求它们为自己投票。

  4. 接收并记录投票:

    • 当节点 A 收到来自节点 B 的 VoteResponse,表示同意投票时,节点 A 会调用 epochElection.recordVote(nodeB_id, true)。这里的 true 意味着“节点 B 赞成我成为 Leader”。
    • 当节点 A 收到来自节点 C 的 VoteResponse,表示拒绝投票时,节点 A 会调用 epochElection.recordVote(nodeC_id, false)。这里的 false 意味着“节点 C 反对我成为 Leader”。
  5. 判断结果:

    • epochElection.isVoteGranted() 判断的是:“赞成我成为 Leader 的票数是否过半?”
    • epochElection.isVoteRejected() 判断的是:“反对我成为 Leader 的票数是否已经多到让我不可能获胜了?”

核心数据结构 (VoterState)

EpochElection 的核心是内部私有类 VoterState,它为每一个投票人维护一个状态。

// ... existing code ...private static final class VoterState {private final ReplicaKey replicaKey;private State state = State.UNRECORDED;// ... existing code ...enum State {UNRECORDED,GRANTED,REJECTED}
// ... existing code ...}
}
  • VoterState 封装了每个投票人的 ReplicaKey(ID 和目录 ID)和投票状态 state
  • State 是一个枚举,有三种可能的值:
    • UNRECORDED: 未收到投票,这是初始状态。
    • GRANTED: 已投赞成票。
    • REJECTED: 已投反对票。

EpochElection 类中持有一个 Map<Integer, VoterState>,通过投票人的 ID 快速查找其投票状态。

// ... existing code ...
public class EpochElection {private Map<Integer, VoterState> voterStates;
// ... existing code ...
}

构造与初始化

EpochElection 在构造时接收一个包含所有投票人(Voters)的 Set<ReplicaKey>

// ... existing code ...public EpochElection(Set<ReplicaKey> voters) {this.voterStates = voters.stream().collect(Collectors.toMap(ReplicaKey::id,VoterState::new));}
// ... existing code ...

它会遍历这个集合,为每个投票人创建一个 VoterState 对象,并以投票人 ID 为键,存入 voterStates 这个 Map 中。此时,所有 VoterState 的内部状态都是默认的 UNRECORDED

记录选票 (recordVote)

这是更新计票结果的入口。

// ... existing code ...public boolean recordVote(int voterId, boolean isGranted) {VoterState voterState = getVoterStateOrThrow(voterId);boolean wasUnrecorded = voterState.state == VoterState.State.UNRECORDED;voterState.setState(isGranted ? VoterState.State.GRANTED : VoterState.State.REJECTED);return wasUnrecorded;}
// ... existing code ...
  • 它接收 voterId 和一个布尔值 isGrantedtrue 代表赞成,false 代表反对)。
  • 它会更新对应 VoterState 的状态。
  • 返回值非常关键: 它返回 true 当且仅当这是第一次记录该投票人的投票(即之前的状态是 UNRECORDED)。如果重复记录同一个人的投票,会更新状态但返回 false

recordVote 的实现允许一个投票从 GRANTED 变为 REJECTED。这看起来似乎不安全,因为 Raft 协议规定在一个任期内,一个节点只能投一次票。

这是分层设计的体现:

  • EpochElection 作为一个底层的计票器,它的职责很简单:忠实记录收到的最新投票状态。
  • 而**保证“一个 voter 在一轮选举中不能改变主意”**这个业务规则的责任,交给了它的调用者——CandidateState。在 CandidateState 中有检查逻辑,如果一个已经投了赞成票的节点又发来反对票,会直接抛出异常,而不会去调用 epochElection.recordVote。

判断选举是否胜利 (isVoteGranted)

这是判断选举是否成功的核心方法。

// ... existing code ...public boolean isVoteGranted() {return numGranted() >= majoritySize();}private long numGranted() {return votersOfState(VoterState.State.GRANTED).count();}private int majoritySize() {return voterStates.size() / 2 + 1;}
// ... existing code ...

逻辑非常清晰:

  1. 计算获得赞成票(GRANTED)的数量 numGranted()
  2. 计算赢得选举需要的多数票数量 majoritySize(),即 (总票数 / 2) + 1
  3. 如果赞成票数大于或等于多数票,返回 true,表示选举成功。

判断选举是否失败 (isVoteRejected)

这是一个重要的优化,让候选人能尽早知道选举失败,而无需等待超时。

// ... existing code ...public boolean isVoteRejected() {return numGranted() + numUnrecorded() < majoritySize();}private long numUnrecorded() {return votersOfState(VoterState.State.UNRECORDED).count();}
// ... existing code ...

这个逻辑稍微有些绕,但非常精妙。它的意思是: “已获得的赞成票数” + “尚未投票的票数” < “赢得选举需要的多数票数”

换句话说,即使所有尚未投票的人都投赞成票,总赞成票数也无法达到多数。在这种情况下,选举已经不可能获胜了,可以判定为失败。

从测试用例中可以看到这个逻辑:在一个3节点的集群中,当收到2个反对票时,isVoteRejected 变为 true

// ... existing code ...// recording majority as rejectedassertTrue(epochElection.recordVote(voter1 + 1, false));
// ... existing code ...assertEquals(2, epochElection.rejectingVoters().size());
// ... existing code ...assertFalse(epochElection.isVoteGranted());assertTrue(epochElection.isVoteRejected());
// ... existing code ...

查询投票者状态

EpochElection 还提供了一系列方法来查询不同状态的投票人集合,这对于日志记录和调试非常有用。

  • grantingVoters(): 返回所有投了赞成票的投票人 ID。
  • rejectingVoters(): 返回所有投了反对票的投票人 ID。
  • unrecordedVoters(): 返回所有尚未投票的投票人。

总结

EpochElection 是一个设计精良的辅助类,它体现了软件工程中的多个优秀实践:

  • 单一职责: 它只做一件事——计票,并且做得很好。这使得调用它的 CandidateState 代码更简洁、更关注于状态机本身。
  • 封装性: 它将计票的内部实现(VoterStateMap 等)完全隐藏,只对外暴露清晰、易于理解的接口(recordVoteisVoteGrantedisVoteRejected)。
  • 逻辑清晰: 无论是判断选举成功还是提前判断选举失败,其核心逻辑都直接且高效,完美地实现了 Raft 协议对选举计票的要求。
  • 优化isVoteRejected 方法是一个很好的例子,它通过提前判断失败来避免不必要的等待,提高了系统在选举竞争激烈时的恢复速度。

通过 EpochElection,KRaft 的选举实现变得更加模块化、健壮和易于理解。

LeaderState

LeaderState 实现了 EpochState 接口,代表一个节点在特定任期(Epoch)内作为 Leader 的状态。一旦一个 Candidate 获得多数选票,它就会转换到 LeaderState

Leader 的核心职责是:

  1. 处理客户端请求: 接收来自客户端的写请求,将它们作为日志条目(Log Entry)写入自己的日志中。
  2. 日志复制: 将新的日志条目通过 AppendEntries RPC(在 Kafka 中对应 Fetch 响应)复制给所有的 Follower。
  3. 推进高水位(High Watermark): 当一个日志条目被复制到大多数节点上时,Leader 就认为该条目是“已提交”(Committed)的,并更新高水位。
  4. 维持活性: Leader 需要周期性地向所有 Follower 发送心跳(空的 AppendEntries RPC)来证明自己的存活,并防止 Follower 因选举超时而发起新的选举。
  5. 管理集群成员变更: 安全地增加或移除集群中的节点。

LeaderState 拥有大量字段来维护其复杂的状态。

// ... existing code ...
public class LeaderState<T> implements EpochState {
// ... existing code ...private final VoterSet.VoterNode localVoterNode;private final int epoch;private final long epochStartOffset;private final Set<Integer> grantingVoters;private final VoterSet voterSetAtEpochStart;
// ... existing code ...private Optional<LogOffsetMetadata> highWatermark = Optional.empty();private Map<Integer, ReplicaState> voterStates = new HashMap<>();
// ... existing code ...private final Map<ReplicaKey, ReplicaState> observerStates = new HashMap<>();private final Logger log;private final BatchAccumulator<T> accumulator;// The set includes all the followers voters that FETCH or FETCH_SNAPSHOT during the current checkQuorumTimer interval.private final Set<Integer> fetchedVoters = new HashSet<>();private final Timer checkQuorumTimer;private final int checkQuorumTimeoutMs;private final Timer beginQuorumEpochTimer;
// ... existing code ...// This is volatile because resignation can be requested from an external thread.private volatile boolean resignRequested = false;
// ... existing code ...
}
  • epochlocalVoterNodeepochStartOffset: Leader 的基本信息:当前任期、自己的节点信息、任期开始时的日志位移。
  • voterStatesobserverStates: 这两个 Map 是 Leader 管理集群的核心。它们为每一个 Follower(包括 Voter 和 Observer)维护一个 ReplicaState 对象,用于跟踪该 Follower 的日志同步进度、最后一次通信时间等信息。
  • highWatermark高水位标记。这是 Raft 安全性的基石。只有位移小于等于高水位的日志才被认为是“已提交”的,可以被应用到状态机。
  • accumulator: 一个批处理累加器。Leader 将客户端的写请求先放入这个累加器中,然后批量写入本地日志并发送给 Follower,以提高吞吐量。
  • checkQuorumTimercheckQuorumTimeoutMs法定人数检查计时器。这是 Leader 的“生命线”。Leader 必须在 checkQuorumTimeoutMs 时间内收到大多数 Follower 的心跳(Fetch 请求)。如果超时,说明 Leader 可能与集群的大多数节点失联,必须主动下台。
  • fetchedVoters: 一个集合,用于在 checkQuorumTimer 的一个周期内,记录哪些 Follower 已经发来过 Fetch 请求。
  • beginQuorumEpochTimer: 用于向尚未确认新 Leader 的 Follower 发送 BeginQuorumEpoch 请求。
  • addVoterHandlerStateremoveVoterHandlerState: 用于处理动态成员变更(增加/移除 Voter)时的状态。
  • resignRequested: 一个 volatile 标志,允许从外部线程安全地请求 Leader 主动辞职。

构造与初始化

当一个节点从 CandidateState 转换到 LeaderState 时,会调用其构造函数进行初始化。

// ... existing code ...protected LeaderState(Time time,VoterSet.VoterNode localVoterNode,int epoch,long epochStartOffset,VoterSet voterSetAtEpochStart,OptionalLong offsetOfVotersAtEpochStart,KRaftVersion kraftVersionAtEpochStart,Set<Integer> grantingVoters,BatchAccumulator<T> accumulator,int fetchTimeoutMs,LogContext logContext,KafkaRaftMetrics kafkaRaftMetrics) {
// ... existing code ...for (VoterSet.VoterNode voterNode: voterSetAtEpochStart.voterNodes()) {boolean hasAcknowledgedLeader = voterNode.isVoter(localVoterNode.voterKey());this.voterStates.put(voterNode.voterKey().id(),new ReplicaState(voterNode.voterKey(), hasAcknowledgedLeader, voterNode.listeners()));}
// ... existing code ...// use the 1.5x of fetch timeout to tolerate some network transition time or other IO time.this.checkQuorumTimeoutMs = (int) (fetchTimeoutMs * CHECK_QUORUM_TIMEOUT_FACTOR);this.checkQuorumTimer = time.timer(checkQuorumTimeoutMs);this.beginQuorumEpochTimeoutMs = fetchTimeoutMs / 2;this.beginQuorumEpochTimer = time.timer(0);
// ... existing code ...}
// ... existing code ...

关键初始化步骤:

  1. 初始化 Follower 状态: 遍历当前所有的 Voter,为它们创建 ReplicaState 对象并存入 voterStates。注意,Leader 自己也被视为一个已确认的 Voter。
  2. 设置 Quorum 检查超时checkQuorumTimeoutMs 通常设置为 fetchTimeoutMs 的 1.5 倍,提供一定的网络抖动容忍度。并启动 checkQuorumTimer
  3. 重置高水位: 一个新 Leader 当选后,它不能继承旧的 highWatermark。它必须通过与 Follower 的交互重新建立 highWatermark,以保证其单调性。

日志复制与高水位(High Watermark)推进

Leader 通过 updateReplicaState 方法更新 Follower 的日志同步进度。当收到 Follower 的 Fetch 请求后,Leader 会知道该 Follower 已经复制到了哪个位移。

高水位的计算是 Leader 的核心安全职责。Leader 会在每次有 Follower 的日志位移更新时,重新计算高水位。其逻辑是:将所有 Voter(包括 Leader 自己)的已知日志位移进行排序,取中间位置的那个位移作为新的高水位。例如,在一个5节点的集群中,高水位就是所有节点位移中第 (5/2 + 1) = 3 大的那个值。这确保了高水位标记的日志条目一定存在于多数节点上。

法定人数检查(Quorum Check)与活性维持

这是 Leader 的“保活”机制,通过 checkQuorumTimer 实现。

// ... existing code ...public void updateCheckQuorumForFollowingVoter(ReplicaKey replicaKey, long currentTimeMs) {updateFetchedVoters(replicaKey);// The majority number of the voters. Ex: 2 for 3 voters, 3 for 4 voters... etc.int majority = (voterStates.size() / 2) + 1;// If the leader is in the voter set, it should be implicitly counted as part of the// majority, but the leader will never be a member of the fetchedVoters.// If the leader is not in the voter set, it is not in the majority. Then, the// majority can only be composed of fetched voters.if (voterStates.containsKey(localVoterNode.voterKey().id())) {majority = majority - 1;}if (fetchedVoters.size() >= majority) {fetchedVoters.clear();checkQuorumTimer.update(currentTimeMs);checkQuorumTimer.reset(checkQuorumTimeoutMs);}}
// ... existing code ...
  1. 当 Leader 收到一个 Follower 的 Fetch 请求时,调用 updateCheckQuorumForFollowingVoter
  2. 该 Follower 的 ID 被加入 fetchedVoters 集合。
  3. 计算出达成多数需要的 Follower 数量(总数/2 + 1,再减去 Leader 自己)。
  4. 如果 fetchedVoters 的数量达到了这个多数,说明 Leader 与大多数 Follower 通信正常。此时,清空 fetchedVoters 集合,并重置 checkQuorumTimer
  5. 如果计时器在重置前就超时了(通过 timeUntilCheckQuorumExpires 判断),Leader 就会退位。

集群成员变更(Reconfiguration)

Leader 负责协调集群成员的变更。当收到增加/移除 Voter 的请求时,它会创建 AddVoterHandlerState 或 RemoveVoterHandlerState 来管理这个过程,这通常是一个两阶段的过程,以确保变更的安全性。

主动辞职(Resignation)

通过调用 requestResign() 方法,可以将 resignRequested 标志位设为 true。Leader 在其主循环中会检查这个标志,如果为 true,它会尝试将领导权平滑地转移给一个日志最全的 Follower,然后自己退位。

状态转换

Leader 状态不是永久的。在以下情况下,Leader 会退位并转换到 FollowerState

  1. 失去法定人数(Quorum)checkQuorumTimer 超时,表明 Leader 与大多数 Follower 失联。
  2. 发现更高任期: 收到来自任何节点(Candidate 或其他 Leader)的、带有更高 epoch 的消息。这是 Raft 协议的核心规则,确保系统中永远只有一个合法的 Leader。
  3. 主动辞职resignRequested 被触发。

总结

LeaderState 是 KRaft 协议的“引擎”。它是一个高度复杂但职责清晰的状态,体现了分布式共识协议的核心思想:

  • 中心化协调: Leader 作为唯一的写入口和协调者,简化了系统的设计。
  • 安全性: 通过严格的高水位计算和任期规则,保证了已提交日志的不可逆转性。
  • 活性(Liveness): 通过心跳和 Quorum 检查机制,确保当 Leader 失效或网络分区时,集群能够及时发现并选举出新的 Leader,保证服务的可用性。
  • 可扩展性: 封装了动态成员变更的逻辑,使得集群可以在线扩缩容。

理解了 LeaderState 的工作原理,就掌握了 KRaft 协议最核心的运作机制。

UnattachedStateResignedState 和 ProspectiveState 

这三个在 KRaft 中非常重要的瞬时或过渡状态。它们虽然不像 FollowerStateCandidateState 和 LeaderState 那样是 Raft 协议的经典状态,但在 Kafka 的实现中起到了关键的粘合与缓冲作用。

ProspectiveState (预备/勘探状态)

ProspectiveState 是 Raft 选举过程中的一个前置状态,位于 FollowerState 和 CandidateState 之间。它的引入主要是为了解决 Raft 协议中的一个潜在问题:扰乱性选举(Disruptive Elections)

作用与目的:

  1. 预投票(Pre-Vote)阶段: 这是 ProspectiveState 的核心职责。当一个节点(比如节点 A)的选举计时器超时,它不会立即进入 CandidateState 并增加自己的任期号(epoch)。相反,它会进入 ProspectiveState,并以当前的任期号向其他节点发送“预投票”请求(Pre-Vote Request)。

  2. 避免不必要的选举:

    • 场景: 假设节点 A 只是暂时与 Leader 网络隔离,但 Leader 和集群的大多数节点通信正常。如果没有 ProspectiveState,节点 A 会立即成为 Candidate,增加任期号,并发起一轮正式选举。这会导致正在正常工作的 Leader 收到更高任期的投票请求而下台,从而对整个集群造成不必要的干扰。
    • 有了 ProspectiveState: 节点 A 发送的预投票请求会被其他节点拒绝,因为它无法证明自己的日志比其他节点更新。收到多数拒绝后,节点 A 就知道自己不适合发起选举,会重新回到 FollowerState 或 UnattachedState,等待 Leader 的心跳。这样就避免了一次无效且具有破坏性的选举。
  3. 状态转换路径:

    • Follower/Unattached -> Prospective: 当选举计时器超时。
    • Prospective -> Candidate: 如果收到了大多数节点的预投票赞成票,证明自己有很大希望能赢得选举,此时才会正式转换到 CandidateState,增加任期号,并发起正式投票。
    • Prospective -> Follower/Unattached: 如果预投票被大多数节点拒绝,或者在预投票期间收到了合法 Leader 的消息,则会放弃选举,转换回 Follower 或 Unattached 状态。

// ... existing code ...
public class ProspectiveState implements NomineeState {
// ... existing code .../*** The lifetime of a prospective state is the following.** 1. Once started, it will send prevote requests and keep record of the received vote responses* 2. If it receives a message denoting a leader with a higher epoch, it will transition to follower state.* 3. If majority votes granted, it will transition to candidate state.* 4. If majority votes rejected or election times out, it will transition to unattached or follower state*    depending on if it knows the leader id and endpoints or not*/
// ... existing code ...
}

这段注释清晰地描述了 ProspectiveState 的生命周期和作用。

UnattachedState (未连接/游离状态)

UnattachedState 是一个非常基础的初始状态或回退状态。当一个节点启动时,或者当它与集群失去联系(比如不知道当前的 Leader 是谁)时,就会进入这个状态。

作用与目的:

  1. 启动时的初始状态: 当一个 Raft 节点刚启动时,它不知道集群的当前状态(谁是 Leader,任期号是多少)。此时它就处于 UnattachedState,像一个“局外人”一样,等待集群中其他节点的消息。

  2. 失去 Leader 后的回退状态: 如果一个 Follower 长时间没有收到 Leader 的心跳,它的选举计时器会超时。在进入 ProspectiveState 之前,它可能会先短暂进入 UnattachedState,表示“我不知道 Leader 是谁了”。

  3. 等待信息: 在 UnattachedState 下,节点非常被动。它主要做的事情就是等待:

    • 等待其他节点发来的 VoteRequest,然后根据规则决定是否投票。
    • 等待 Leader 发来的 BeginQuorumEpoch 或 Fetch 响应,一旦收到,就能知道新的 Leader 和任期,并转换到 FollowerState
    • 自己的选举计时器超时,然后转换到 ProspectiveState 尝试发起选举。

总结UnattachedState 是一个“信息不足”的状态,节点在此状态下无法参与日志复制,只能被动地等待或主动发起选举来重新融入集群。

ResignedState (已辞职状态)

ResignedState 是 Leader 主动下台后进入的一个短暂的过渡状态。

作用与目的:

  1. 平滑的领导权交接: 当 Leader 决定辞职时(例如,管理员触发了下线操作,或者它发现自己不再是 Voter),它不会立即变成 Follower。它会先向所有 Follower 发送一个 EndQuorumEpoch 请求,通知它们“我不再是 Leader 了,你们可以开始新的选举了”。

  2. 进入 ResignedState: 发送完 EndQuorumEpoch 请求后,Leader 会立即进入 ResignedState

  3. 只读和拒绝服务: 在 ResignedState 下,该节点会:

    • 拒绝所有新的写请求。
    • 继续响应 Follower 的 Fetch 请求,但不会再有新的日志条目。
    • 拒绝其他候选人的投票请求,因为它知道一轮新的选举即将开始。
  4. 等待新 LeaderResignedState 的主要目的是等待。一旦它收到了来自新选举产生的 Leader 的消息,它就会转换到 FollowerState,成为新 Leader 的一个普通 Follower。

总结ResignedState 是一个优雅下台的机制。它确保了 Leader 的辞职是一个主动、可控的过程,而不是一个突然的崩溃,从而让集群能够更平稳、快速地完成领导权交接。


整体关系

这三个状态可以看作是标准 Raft 状态机(Follower, Candidate, Leader)的“辅助轮”和“缓冲带”,使得 KRaft 的状态转换更加健壮和优雅:

  • UnattachedState 是起点迷失点
  • ProspectiveState 是从 Follower 到 Candidate 的审慎的跳板,防止鲁莽的选举。
  • ResignedState 是从 Leader 到 Follower 的平滑的滑梯,确保优雅的退位。

http://www.lryc.cn/news/592801.html

相关文章:

  • Redis学习其二(事务,SpringBoot整合,持久化RDB和AOF)
  • java基础——面向对象04(继承)
  • 通俗易懂:什么是决策树?
  • STM32-第七节-TIM定时器-3(输入捕获)
  • STL—— list迭代器封装的底层讲解
  • 小白学Python,网络爬虫篇(2)——selenium库
  • 2025年Flutter开发主流技术栈
  • Windows发现可疑的svchost程序
  • 怎么自己搭建云手机
  • Hive 向量化执行引擎 Vectorized Execution 常见 NPE 报错分析及解决
  • 域名WHOIS信息查询免费API使用指南
  • HIVE实战处理(二十四)留存用户数
  • 专题:2025智能体研究报告|附70份报告PDF、原数据表汇总下载
  • 线程控制:互斥与同步
  • math.h函数
  • 深度学习零基础入门(3)-图像与神经网络
  • 需求变更频繁?构建动态估算机制四大要点
  • 短视频矩阵系统:选择与开发的全面指南
  • nastools继任者?极空间部署影视自动化订阅系统『MediaMaster』
  • 代理模式及优化
  • 解锁时序数据库选型密码,为何国产开源时序数据库IoTDB脱颖而出?
  • 脉冲神经网络(Spiking Neural Network, SNN)与知识蒸馏(Knowledge Distillation, KD)
  • Vue3 Anime.js超级炫酷的网页动画库详解
  • Kubernetes (k8s)、Rancher 和 Podman 的异同点分析
  • Jmeter系列(6)-测试计划
  • 网关-微服务网关实现
  • Postman/Apipost中使用Post URL编码发送含换行符参数的问题分析
  • vue2 面试题及详细答案150道(101 - 120)
  • 智慧后厨检测算法构建智能厨房防护网
  • Redis学习其三(订阅发布,主从复制,哨兵模式)