当前位置: 首页 > news >正文

Raft 代码分析

核心RPC服务分类

在 Raft 协议中,RPC(Remote Procedure Call) 是一种远程过程调用机制,用于不同节点之间进行通信。

业务应用RPC服务 

        ExampleService 是一个 RPC 服务接口,它定义了两个操作:set 和 get,分别用于数据的写入和读取。这类接口通常用于定义服务端提供的业务功能,并通过 RPC 技术实现客户端与服务端的通信。

接口定义:

public interface ExampleService {ExampleProto.SetResponse set(ExampleProto.SetRequest request);ExampleProto.GetResponse get(ExampleProto.GetRequest request);
}

set(SetRequest):数据写入操作,用于将某些数据存储到服务器中。
get(GetRequest):数据读取操作,用于从服务器获取数据。

Raft节点间通信RPC服务
        在 Raft 协议中,节点之间的通信通常是通过 RPC(远程过程调用)来实现的。这些 RPC 服务用于处理选举、日志复制、快照同步等关键操作。
public interface RaftConsensusService {RaftProto.VoteResponse preVote(RaftProto.VoteRequest request);RaftProto.VoteResponse requestVote(RaftProto.VoteRequest request);RaftProto.AppendEntriesResponse appendEntries(RaftProto.AppendEntriesRequest request);RaftProto.InstallSnapshotResponse installSnapshot(RaftProto.InstallSnapshotRequest request);
}

preVote(VoteRequest request)
作用:preVote 方法是 Raft 协议中的 预投票 RPC。它用于候选者在开始正式选举之前向其他节点请求投票。预投票的目的是减少因网络延迟或部分节点故障而导致的选举冲突。候选者会在收到预投票时验证是否可以获得投票。
响应:VoteResponse 表示预投票的结果。

requestVote(VoteRequest request)
作用:requestVote 方法是 Raft 协议中的 正式投票 RPC。候选者在选举过程中会向其他节点请求投票。如果节点尚未投票且候选者的日志比较新(符合 Raft 协议的条件),该节点会返回投票给候选者。
响应:VoteResponse 表示正式投票的结果。

appendEntries(AppendEntriesRequest request)
作用:appendEntries 方法是 Raft 协议中的 日志复制和心跳 RPC。当选举完成后,领导者需要向其他节点复制日志条目以保证日志一致性。即使没有日志复制的任务,领导者也会定期向所有跟随者发送心跳信号(空的 AppendEntriesRequest)来维持领导者的地位。
响应:AppendEntriesResponse 表示日志复制或心跳的结果。

installSnapshot(InstallSnapshotRequest request)
作用:installSnapshot 方法是 Raft 协议中的 快照安装 RPC。由于 Raft 协议要求保持节点日志的一致性,节点的日志会不断增长,导致存储开销增大。为了减少日志的大小,Raft 支持通过快照来压缩日志。快照是节点状态的完整副本,通过此 RPC 安装快照。
响应:InstallSnapshotResponse 表示快照安装的结果。

客户端管理RPC服务

        RaftClientService 接口定义了与 Raft 集群通信的客户端管理 RPC 服务。这个接口的主要功能是获取集群的领导节点信息、获取集群的配置、以及管理集群节点(添加或删除节点)

public interface RaftClientService {RaftProto.GetLeaderResponse getLeader(RaftProto.GetLeaderRequest request);RaftProto.GetConfigurationResponse getConfiguration(RaftProto.GetConfigurationRequest request);RaftProto.AddPeersResponse addPeers(RaftProto.AddPeersRequest request);RaftProto.RemovePeersResponse removePeers(RaftProto.RemovePeersRequest request);
}

getLeader(GetLeaderRequest request)
作用:该方法用于获取当前 Raft 集群的领导节点(Leader)信息。通常在集群中,只有 Leader 节点可以接受客户端的请求,并进行数据的写入。
请求:GetLeaderRequest 可以是一个简单的请求类型,不需要太多的参数。
响应:GetLeaderResponse 返回 Leader 节点的相关信息(如 Leader 的地址、ID )

getConfiguration(GetConfigurationRequest request)
作用:该方法用于获取当前 Raft 集群的配置信息,包括集群中所有节点的状态(哪些是 Leader、哪些是 Follower、哪些是 Candidate 等)。它还可以返回每个节点的地址以及它们在集群中的角色。
请求:GetConfigurationRequest 是一个获取配置信息的请求。
响应:GetConfigurationResponse 返回集群中各个节点的信息。

addPeers(AddPeersRequest request)
作用:该方法用于向 Raft 集群中添加一个新的节点。这个请求通常由 Leader 节点发起,因为 Leader 负责管理集群成员的变动。
请求:AddPeersRequest 包含要添加的节点的信息(比如节点的地址、角色等)
响应:AddPeersResponse 表示添加操作是否成功。

removePeers(RemovePeersRequest request) 
作用:该方法用于从 Raft 集群中删除一个节点。如果节点失效或者需要进行集群扩容/缩容,可以使用此方法删除节点。
请求:RemovePeersRequest 包含要删除的节点的信息。
响应:RemovePeersResponse 表示删除操作是否成功。

客户端写操作RPC调用链

客户端发起写请求

入口: ClientMain.java → exampleService.set(setRequest)
ClientMain 是客户端的主入口类,负责从命令行接收参数、初始化 RPC 客户端、发起请求并打印响应结果。

ClientMain代码如下:

public class ClientMain {public static void main(String[] args) {if (args.length < 2) {System.out.printf("Usage: ./run_client.sh CLUSTER KEY [VALUE]\n");System.exit(-1);}// parse argsString ipPorts = args[0];String key = args[1];String value = null;if (args.length > 2) {value = args[2];}// init rpc clientRpcClient rpcClient = new RpcClient(ipPorts);ExampleService exampleService = BrpcProxy.getProxy(rpcClient, ExampleService.class);final JsonFormat jsonFormat = new JsonFormat();// setif (value != null) {ExampleProto.SetRequest setRequest = ExampleProto.SetRequest.newBuilder().setKey(key).setValue(value).build();ExampleProto.SetResponse setResponse = exampleService.set(setRequest);System.out.printf("set request, key=%s value=%s response=%s\n",key, value, jsonFormat.printToString(setResponse));} else {// getExampleProto.GetRequest getRequest = ExampleProto.GetRequest.newBuilder().setKey(key).build();ExampleProto.GetResponse getResponse = exampleService.get(getRequest);System.out.printf("get request, key=%s, response=%s\n",key, jsonFormat.printToString(getResponse));}rpcClient.stop();}
}

下面分析ClientMain代码

客户端启动过程
命令行参数解析:
客户端从命令行接收集群信息(ipPorts)、键(key)和值(value)作为参数。
如果没有提供值(value),客户端会执行一个 get 操作,否则执行 set 操作。

String ipPorts = args[0];   // 集群的 IP 和端口
String key = args[1];       // 请求的键
String value = null;
if (args.length > 2) {value = args[2];        // 请求的值
}

初始化 RPC 客户端:
创建一个 RpcClient 实例并指定服务器集群的 IP 和端口(ipPorts)
使用 BrpcProxy.getProxy 来生成代理对象 exampleService,它会向 ExampleService 接口发送 RPC 请求。

RpcClient rpcClient = new RpcClient(ipPorts);
ExampleService exampleService = BrpcProxy.getProxy(rpcClient, ExampleService.class);

构建请求:
如果有 value 参数,构造一个 SetRequest 请求,发送 set 请求给服务器。
如果没有 value,构造一个 GetRequest 请求,发送 get 请求。

ExampleProto.SetRequest setRequest = ExampleProto.SetRequest.newBuilder().setKey(key).setValue(value).build();

发送请求并打印响应:
调用 exampleService.set(setRequest),发起 RPC 请求。
打印响应信息。

ExampleProto.SetResponse setResponse = exampleService.set(setRequest);
System.out.printf("set request, key=%s value=%s response=%s\n",key, value, jsonFormat.printToString(setResponse));

停止客户端:
在操作完成后,停止 RPC 客户端。

rpcClient.stop();

业务服务层处理

ExampleServiceImpl.java → set()方法

ExampleServiceImpl 类中的 set 方法负责处理客户端发起的写请求。该方法根据当前节点的角色(是否为领导者)来决定如何处理请求,并与 Raft 集群中的其他节点进行交互。

set方法代码如下:

@Override
public ExampleProto.SetResponse set(ExampleProto.SetRequest request) {ExampleProto.SetResponse.Builder responseBuilder = ExampleProto.SetResponse.newBuilder();// 如果自己不是leader,将写请求转发给leaderif (raftNode.getLeaderId() <= 0) {responseBuilder.setSuccess(false);  // 没有领导者} else if (raftNode.getLeaderId() != raftNode.getLocalServer().getServerId()) {// 当前节点不是领导者,转发请求给领导者onLeaderChangeEvent();ExampleService exampleService = BrpcProxy.getProxy(leaderRpcClient, ExampleService.class);ExampleProto.SetResponse responseFromLeader = exampleService.set(request);responseBuilder.mergeFrom(responseFromLeader);  // 合并领导者的响应} else {// 当前节点是领导者,执行数据同步byte[] data = request.toByteArray();  // 将请求数据序列化boolean success = raftNode.replicate(data, RaftProto.EntryType.ENTRY_TYPE_DATA);  // 将数据同步到集群responseBuilder.setSuccess(success);  // 设置成功标志}// 构建响应并返回ExampleProto.SetResponse response = responseBuilder.build();LOG.info("set request, request={}, response={}", jsonFormat.printToString(request),jsonFormat.printToString(response));return response;
}

下面分析 set 方法

1. 判断是否是领导者节点
首先,方法判断当前节点是否为 Raft 集群的领导者:

if (raftNode.getLeaderId() <= 0) {responseBuilder.setSuccess(false);  // 没有领导者
}

如果 raftNode.getLeaderId() 小于等于 0,表示当前没有有效的领导者节点,方法返回失败。

2. 转发请求给领导者
如果当前节点不是领导者,方法将请求转发给 Raft 集群中的领导者进行处理:

else if (raftNode.getLeaderId() != raftNode.getLocalServer().getServerId()) {onLeaderChangeEvent();  // 通知领导者发生变化ExampleService exampleService = BrpcProxy.getProxy(leaderRpcClient, ExampleService.class);ExampleProto.SetResponse responseFromLeader = exampleService.set(request);responseBuilder.mergeFrom(responseFromLeader);  // 合并领导者的响应
}

raftNode.getLeaderId() 获取当前领导者的 ID,raftNode.getLocalServer().getServerId() 获取当前节点的 ID。
如果当前节点不是领导者,调用 onLeaderChangeEvent() 方法来处理领导者变更事件(更新领导者信息)
使用 BRPC 框架代理 ExampleService,向领导者节点发送 set 请求,并将领导者的响应合并到当前响应中。

3. 领导者节点处理数据同步
如果当前节点是领导者,方法将数据同步到 Raft 集群中的所有节点:

else {byte[] data = request.toByteArray();  // 将请求数据序列化boolean success = raftNode.replicate(data, RaftProto.EntryType.ENTRY_TYPE_DATA);  // 将数据同步到集群responseBuilder.setSuccess(success);  // 设置成功标志
}

将请求数据序列化为字节数组,调用 raftNode.replicate() 方法将数据同步到 Raft 集群。
RaftProto.EntryType.ENTRY_TYPE_DATA 表示这是一条普通的数据写入操作。
如果同步成功,返回成功状态。

4. 返回响应
无论是哪种情况,方法最后都构建并返回一个 SetResponse 响应:

ExampleProto.SetResponse response = responseBuilder.build();
LOG.info("set request, request={}, response={}", jsonFormat.printToString(request),jsonFormat.printToString(response));
return response;

responseBuilder.build() 用于生成最终的 SetResponse 对象。
通过 LOG.info 打印请求和响应的日志信息,以便调试和记录。

ExampleProto.SetResponse.Builder 与 ExampleProto.SetResponse 的区别

在 Protobuf 中,SetResponse 是一个不可变的消息对象,它的实例不能直接修改。为了能够构建和修改 SetResponse 对象,Protobuf 提供了一个构建器(Builder)类。
ExampleProto.SetResponse
SetResponse 是最终的、不可变的响应对象,表示 Protobuf 消息的完整实例。
一旦创建,SetResponse 的字段值就不能再改变。
它是通过 Builder 类构建出来的,通常用来在处理完请求后,返回给调用者的响应结果。

ExampleProto.SetResponse.Builder
Builder 是 SetResponse 类的内部构建器类,用于动态地构建 SetResponse 实例。
使用 Builder 类可以设置 SetResponse 中的字段(例如 setSuccess())
Builder 允许修改和设置字段值,直到完成所有的设置后,通过 build() 方法将其转换为一个不可变的 SetResponse 实例。

Raft核心复制逻辑

RaftNode.java → replicate()方法

replicate() 方法是 Raft 协议中关键的日志复制逻辑部分,它在领导者节点上执行,负责将新的日志条目复制到集群中的其他节点。

replicate 方法代码如下:

public boolean replicate(byte[] data, RaftProto.EntryType entryType) {lock.lock();long newLastLogIndex = 0;try {if (state != NodeState.STATE_LEADER) {LOG.debug("I'm not the leader");return false;}RaftProto.LogEntry logEntry = RaftProto.LogEntry.newBuilder().setTerm(currentTerm).setType(entryType).setData(ByteString.copyFrom(data)).build();List<RaftProto.LogEntry> entries = new ArrayList<>();entries.add(logEntry);newLastLogIndex = raftLog.append(entries);// raftLog.updateMetaData(currentTerm, null, raftLog.getFirstLogIndex());for (RaftProto.Server server : configuration.getServersList()) {final Peer peer = peerMap.get(server.getServerId());executorService.submit(() -> appendEntries(peer));}if (raftOptions.isAsyncWrite()) {// 主节点写成功后,就返回。return true;}// sync wait commitIndex >= newLastLogIndexlong startTime = System.currentTimeMillis();while (lastAppliedIndex < newLastLogIndex) {if (System.currentTimeMillis() - startTime >= raftOptions.getMaxAwaitTimeout()) {break;}commitIndexCondition.await(raftOptions.getMaxAwaitTimeout(), TimeUnit.MILLISECONDS);}} catch (Exception ex) {ex.printStackTrace();} finally {lock.unlock();}LOG.debug("lastAppliedIndex={} newLastLogIndex={}", lastAppliedIndex, newLastLogIndex);if (lastAppliedIndex < newLastLogIndex) {return false;}return true;
}

下面分析 replicate 方法

replicate() 方法的主要功能是:
将客户端的写请求作为新的日志条目添加到 Raft 日志中。
将日志条目异步地复制到集群中的其他节点。
如果 isAsyncWrite() 配置为 false,则同步等待日志复制到大多数节点并且日志提交。

public boolean replicate(byte[] data, RaftProto.EntryType entryType) {lock.lock();  // 获取锁,确保线程安全long newLastLogIndex = 0;try {// 确保当前节点是领导者if (state != NodeState.STATE_LEADER) {LOG.debug("I'm not the leader");return false;}

lock.lock():方法首先获取一个锁,确保在执行日志复制时不会有其他线程干扰。
state != NodeState.STATE_LEADER:如果当前节点不是领导者,直接返回 false,表示日志复制失败。

RaftProto.LogEntry logEntry = RaftProto.LogEntry.newBuilder().setTerm(currentTerm).setType(entryType).setData(ByteString.copyFrom(data)).build();

创建日志条目:根据传入的 data 和 entryType 构建一个新的日志条目(LogEntry),并将其设置为当前任期(currentTerm)和指定的条目类型(entryType)。

List<RaftProto.LogEntry> entries = new ArrayList<>();entries.add(logEntry);newLastLogIndex = raftLog.append(entries);

日志条目追加到 Raft 日志中:将创建的 logEntry 加入到 Raft 日志,并返回新的日志索引(newLastLogIndex)
raftLog.append(entries):负责将日志条目写入日志存储中,并返回新添加条目的索引。

for (RaftProto.Server server : configuration.getServersList()) {final Peer peer = peerMap.get(server.getServerId());executorService.submit(new Runnable() {@Overridepublic void run() {appendEntries(peer);}});
}

日志复制到其他节点:遍历 Raft 集群中的所有服务器,将日志条目异步地发送到每个节点。每个节点会通过 appendEntries(peer) 方法来处理日志复制。
executorService.submit():用于提交异步任务,确保日志条目的复制不阻塞当前线程。

if (raftOptions.isAsyncWrite()) {// 主节点写成功后,就返回。return true;
}

异步写入:如果配置为异步写入(isAsyncWrite()),replicate() 方法会立即返回 true,表示日志条目已经提交,客户端可以认为写入操作已经完成。
return true:即使日志没有完全复制到所有节点,领导者节点也会返回 true,表示写请求已经成功提交。

// 同步等待日志复制到大多数节点
long startTime = System.currentTimeMillis();
while (lastAppliedIndex < newLastLogIndex) {if (System.currentTimeMillis() - startTime >= raftOptions.getMaxAwaitTimeout()) {break;  // 超时退出}commitIndexCondition.await(raftOptions.getMaxAwaitTimeout(), TimeUnit.MILLISECONDS);
}

同步等待:如果不是异步写入,则进入同步等待阶段。通过 commitIndexCondition.await() 方法阻塞当前线程,直到 commitIndex 达到或超过 newLastLogIndex(即日志复制完成)
超时机制:如果在等待期间超时,循环会退出。

领导者将日志复制到多数节点后,更新 commitIndex 并广播。
跟随者收到广播后,将 commitIndex 之前的日志应用到状态机(更新 lastAppliedIndex)

客户端需要等待操作被提交后才能确认成功

} catch (Exception ex) {ex.printStackTrace();  // 异常处理
} finally {lock.unlock();  // 释放锁
}LOG.debug("lastAppliedIndex={} newLastLogIndex={}", lastAppliedIndex, newLastLogIndex);
if (lastAppliedIndex < newLastLogIndex) {return false;  // 如果日志没有成功复制到大多数节点,返回 false
}
return true;  // 日志复制成功,返回 true

日志复制状态检查:如果 lastAppliedIndex 小于 newLastLogIndex,说明日志还没有成功复制到大多数节点,方法会返回 false。
最终返回值:如果日志复制成功,返回 true。

思考:为什么异步直接返回 true,同步不行

        在异步写入模式下,领导者节点接收到客户端的写请求后,不会等待日志复制的完成或提交。它会立即返回 true,告诉客户端请求已经成功提交。异步写入的设计目的是提高性能,减少等待时间,允许客户端快速继续执行下一步操作,而不需要等到日志复制到所有节点。缺点是日志复制失败或超时的情况下,客户端并不知情。

        在同步写入模式下,领导者节点在返回客户端之前会等待日志条目被复制到大多数节点,并且等待 commitIndex 达到或超过新日志条目的索引(newLastLogIndex)。这样可以保证客户端的写请求在日志复制完成后才会被认为是成功的。

异步写入:
目的是提高性能,降低响应延迟。

客户端在请求发送后不会等待日志复制完成,领导者会立即返回 true,表示请求已提交。

这种方式适合不需要强一致性的场景,比如需要高吞吐量且可以容忍数据丢失的系统。

同步写入:
目的是保证日志的持久性和一致性。

客户端在请求发送后会等待日志条目被复制到大多数节点并提交,确保写入操作被可靠地记录。

这种方式适合需要强一致性的场景,比如数据库或关键系统,确保数据在大多数节点之间一致。


尚未完结

http://www.lryc.cn/news/586279.html

相关文章:

  • 区块链平台之以太坊深入解读:技术、经济与生态的全面解析
  • el-tree 懒加载 loadNode
  • 可穿戴智能硬件在国家安全领域的应用
  • 【设计模式】装饰(器)模式 透明装饰模式与半透明装饰模式
  • Lua ADB 接口文档
  • GGE Lua 详细教程
  • C# 接口(派生成员作为实现)
  • nginx反向代理实现跨域请求
  • 分层架构的C++高并发内存池性能优化
  • STP生成树协议
  • Eureka实战
  • Linux - 安全排查 3
  • 带货视频评论洞察 Baseline 学习笔记 (Datawhale Al夏令营)
  • 【读书笔记】《C++ Software Design》第一章《The Art of Software Design》
  • 【大模型面试】50道大型语言模型(LLM)面试问题汇总,看完少走99%弯路!
  • 不止于监控:深入剖析OpenTelemetry的可观察性生态体系
  • LeetCode 3169.无需开会的工作日:排序+一次遍历——不需要正难则反,因为正着根本不难
  • 暑期前端训练day6
  • 历史数据分析——云南白药
  • 连接池的核心接口和常用属性
  • 基于大模型的鼻咽癌全周期预测及诊疗优化研究报告
  • SQL新手入门详细教程和应用实例
  • 零基础 “入坑” Java--- 九、类和对象(二)
  • 芯片验证之验证策略
  • 【MogDB】一种基于ctid分片并发查询以提升大表查询性能的方式
  • 68 指针的减法操作
  • 【Datawhale AI夏令营】Task2 笔记:MCP Server开发的重难点
  • 使用包管理工具CocoaPods、SPM、Carthage的利弊与趋势
  • tiktok 弹幕 逆向分析
  • 系统性能评估方法深度解析:从经典到现代