基于MySQL实现分布式调度系统的选举算法
基于MySQL实现分布式调度系统的选举算法,可通过基于超时机制的数据库表驱动选主方案实现,利用数据库表协调节点间的状态,以下是Java实现的核心步骤和代码:
1. 数据库表设计
创建选主表,用于记录leader节点和状态:
CREATE TABLE leader_election (service_id varchar(128) NOT NULL,leader_id varchar(128) NOT NULL,last_seen_active timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (service_id)
) ENGINE=InnoDB
2. Java实现选举机制
import java.sql.*;
import java.util.concurrent.*;public class LeaderElection {private final String serviceId; // 服务标识(如"distributed-scheduler")private final String nodeId; // 当前节点ID(如hostname_pid)private final int electionTimeout; // 选举超时时间(秒)private final ScheduledExecutorService scheduler;private volatile boolean isLeader = false;private Connection conn; // MySQL连接public LeaderElection(String serviceId, String nodeId, int timeout) {this.serviceId = serviceId;this.nodeId = nodeId;this.electionTimeout = timeout;this.scheduler = Executors.newSingleThreadScheduledExecutor();// 初始化数据库连接this.conn = DriverManager.getConnection("jdbc:mysql://mysql-host:3306/db", "user", "pwd");}// 启动周期性选举public void start() {scheduler.scheduleAtFixedRate(this::attemptLeadership, 0, 1, TimeUnit.SECONDS);}// 尝试获取领导权private void attemptLeadership() {String sql = "INSERT INTO leader_election (service_id, leader_id, last_seen_active) " +"VALUES (?, ?, NOW()) " +"ON DUPLICATE KEY UPDATE " +"leader_id = IF(last_seen_active < NOW() - INTERVAL ? SECOND, VALUES(leader_id), leader_id), " +"last_seen_active = IF(leader_id = VALUES(leader_id), NOW(), last_seen_active)";try (PreparedStatement stmt = conn.prepareStatement(sql)) {stmt.setString(1, serviceId);stmt.setString(2, nodeId);stmt.setInt(3, electionTimeout); // 超时时间(如20秒)stmt.executeUpdate();updateLeaderStatus(); // 更新当前节点状态} catch (SQLException e) {e.printStackTrace();}}// 检查当前节点是否Leaderprivate void updateLeaderStatus() throws SQLException {String query = "SELECT COUNT(*) AS is_leader FROM leader_election " +"WHERE service_id=? AND leader_id=?";try (PreparedStatement stmt = conn.prepareStatement(query)) {stmt.setString(1, serviceId);stmt.setString(2, nodeId);ResultSet rs = stmt.executeQuery();if (rs.next()) {isLeader = rs.getInt("is_leader") > 0;if (isLeader) System.out.println("✅ Current node is LEADER");}}}// 关闭资源public void shutdown() {scheduler.shutdown();if (conn != null) try { conn.close(); } catch (SQLException ignored) {}}
}
3. 核心逻辑说明
-
节点初始化
每个节点启动时传入唯一nodeId
(如hostname_pid
),并连接到MySQL。 -
领导权竞争逻辑
- 使用
ON DUPLICATE KEY UPDATE
原子操作保证竞争安全 :
若当前无Leader或Leader超时(last_seen_active < NOW() - 20s
),则抢占为Leader。 - 节点周期性(如每秒)尝试更新状态,确保活跃Leader持续续期。
- 使用
-
状态更新
- Leader节点:
last_seen_active
字段被更新为当前时间。 - Follower节点:未更新字段,仅检测自身是否为Leader 3。
- Leader节点:
4. 高级功能扩展
// 强制指定Leader(管理员操作)
public void forceLeadership() throws SQLException {String sql = "REPLACE INTO leader_election (service_id, leader_id, last_seen_active) VALUES (?, ?, NOW())";try (PreparedStatement stmt = conn.prepareStatement(sql)) {stmt.setString(1, serviceId);stmt.setString(2, nodeId);stmt.executeUpdate();}
}// 查询当前Leader
public String getCurrentLeader() throws SQLException {String sql = "SELECT leader_id FROM leader_election WHERE service_id=?";try (PreparedStatement stmt = conn.prepareStatement(sql)) {stmt.setString(1, serviceId);ResultSet rs = stmt.executeQuery();return rs.next() ? rs.getString("leader_id") : null;}
}
5. 算法优点
- 去中心化:依赖MySQL而非额外组件(如ZK/etcd),降低运维成本 3。
- 容错性:Leader故障后,超时机制自动触发重选举 31。
- 快速响应:节点秒级感知Leader变更(JDBC轮询)。
注意事项
- 连接池优化:使用
HikariCP
等连接池避免频繁创建连接。 - 超时时间:根据网络延迟调整
electionTimeout
(建议≥10秒)。 - 多节点隔离:各节点使用独立MySQL连接,避免事务冲突 3。
此方案适用于中小规模集群。如需强一致性高可用场景(如金融系统),可改用Raft算法(如Apache Ratis),但需引入额外组件。