具有熔断能力和活性探测的服务负载均衡解决方案
一、整体架构设计
1.核心组件
- 负载均衡器:负责选择可用的服务节点
- 健康检查器:定期检测服务节点的可用性
- 服务节点管理:维护所有可用节点的状态信息
2.负载均衡策略
- 轮询(Round Robin)
- 随机(Random)
- 加权轮询(Weighted Round Robin)
- 最少连接(Least Connection)
- 一致性哈希(Consistent Hash)
二、完整代码
1.定义节点服务类
public class ServiceNode {private String ip;private int port;private boolean active;private int weight;private int currentConnections;private long lastActiveTime;// 熔断相关属性private int failureCount;private long circuitOpenedTime;private static final int FAILURE_THRESHOLD = 3;private static final long CIRCUIT_BREAKER_TIMEOUT = 30000; // 30秒// 判断节点是否可用(考虑熔断状态)public boolean isAvailable() {if (failureCount >= FAILURE_THRESHOLD) {// 检查是否应该尝试恢复if (System.currentTimeMillis() - circuitOpenedTime > CIRCUIT_BREAKER_TIMEOUT) {return true; // 进入半开状态}return false; // 熔断中}return active; // 正常状态}public void recordFailure() {failureCount++;if (failureCount >= FAILURE_THRESHOLD) {circuitOpenedTime = System.currentTimeMillis();}}public void recordSuccess() {failureCount = 0;circuitOpenedTime = 0;}// 其他getter/setter方法...
}
2.负载均衡接口
public interface LoadBalancer {ServiceNode selectNode(List<ServiceNode> nodes);default void onRequestSuccess(ServiceNode node) {node.setLastActiveTime(System.currentTimeMillis());node.setCurrentConnections(node.getCurrentConnections() - 1);}default void onRequestFail(ServiceNode node) {node.setActive(false);}
}
3.实现不同的负载均衡策略
轮询策略,考虑熔断状态
public class RoundRobinLoadBalancer implements LoadBalancer {private final AtomicInteger counter = new AtomicInteger(0);@Overridepublic ServiceNode selectNode(List<ServiceNode> nodes) {List<ServiceNode> availableNodes = nodes.stream().filter(ServiceNode::isAvailable).collect(Collectors.toList());if (availableNodes.isEmpty()) {// 尝试从不可用节点中选择已经过了熔断超时的节点List<ServiceNode> halfOpenNodes = nodes.stream().filter(n -> !n.isHealthy() && n.getFailureCount() >= ServiceNode.FAILURE_THRESHOLD &&System.currentTimeMillis() - n.getCircuitOpenedTime() > ServiceNode.CIRCUIT_BREAKER_TIMEOUT).collect(Collectors.toList());if (!halfOpenNodes.isEmpty()) {// 选择其中一个进入半开状态的节点int index = counter.getAndIncrement() % halfOpenNodes.size();ServiceNode selected = halfOpenNodes.get(index);selected.setCurrentConnections(selected.getCurrentConnections() + 1);return selected;}throw new RuntimeException("No available nodes");}int index = counter.getAndIncrement() % availableNodes.size();ServiceNode selected = availableNodes.get(index);selected.setCurrentConnections(selected.getCurrentConnections() + 1);return selected;}@Overridepublic void onRequestSuccess(ServiceNode node) {node.recordSuccess();node.setLastActiveTime(System.currentTimeMillis());node.setCurrentConnections(node.getCurrentConnections() - 1);}@Overridepublic void onRequestFail(ServiceNode node) {node.recordFailure();node.setActive(false);node.setCurrentConnections(node.getCurrentConnections() - 1);}
}
加权轮询策略
public class WeightedRoundRobinLoadBalancer implements LoadBalancer {private final AtomicInteger counter = new AtomicInteger(0);@Overridepublic ServiceNode selectNode(List<ServiceNode> nodes) {List<ServiceNode> activeNodes = nodes.stream().filter(ServiceNode::isHealthy).collect(Collectors.toList());if (activeNodes.isEmpty()) {throw new RuntimeException("No available nodes");}int totalWeight = activeNodes.stream().mapToInt(ServiceNode::getWeight).sum();int current = counter.getAndIncrement() % totalWeight;for (ServiceNode node : activeNodes) {if (current < node.getWeight()) {node.setCurrentConnections(node.getCurrentConnections() + 1);return node;}current -= node.getWeight();}// fallback to simple round robinint index = counter.get() % activeNodes.size();ServiceNode selected = activeNodes.get(index);selected.setCurrentConnections(selected.getCurrentConnections() + 1);return selected;}
}
4.健康检查器
public class HealthChecker {private final List<ServiceNode> nodes;private final ScheduledExecutorService scheduler;private final HttpClient httpClient;public HealthChecker(List<ServiceNode> nodes) {this.nodes = nodes;this.scheduler = Executors.newSingleThreadScheduledExecutor();this.httpClient = HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1).connectTimeout(Duration.ofSeconds(2)).build();}public void start() {scheduler.scheduleAtFixedRate(this::checkAllNodes, 0, 10, TimeUnit.SECONDS);}public void stop() {scheduler.shutdown();}private void checkAllNodes() {nodes.parallelStream().forEach(this::checkNode);}private void checkNode(ServiceNode node) {// 如果节点处于熔断状态但未超时,不检查if (node.getFailureCount() >= ServiceNode.FAILURE_THRESHOLD && System.currentTimeMillis() - node.getCircuitOpenedTime() < ServiceNode.CIRCUIT_BREAKER_TIMEOUT) {return;}try {HttpRequest request = HttpRequest.newBuilder().uri(URI.create("http://" + node.getIp() + ":" + node.getPort() + "/health")).timeout(Duration.ofSeconds(2)).GET().build();HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());if (response.statusCode() == 200) {node.setActive(true);node.recordSuccess(); // 重置熔断状态node.setLastActiveTime(System.currentTimeMillis());} else {node.recordFailure();node.setActive(false);}} catch (Exception e) {node.recordFailure();node.setActive(false);}}
}
5.服务调用封装
public class ExternalServiceInvoker {private final LoadBalancer loadBalancer;private final List<ServiceNode> nodes;private final HttpClient httpClient;public ExternalServiceInvoker(LoadBalancer loadBalancer, List<ServiceNode> nodes) {this.loadBalancer = loadBalancer;this.nodes = nodes;this.httpClient = HttpClient.newHttpClient();}public String invokeService(String path, String requestBody) throws Exception {ServiceNode node = null;try {node = loadBalancer.selectNode(nodes);HttpRequest request = HttpRequest.newBuilder().uri(URI.create("http://" + node.getIp() + ":" + node.getPort() + path)).header("Content-Type", "application/json").timeout(Duration.ofSeconds(5)).POST(HttpRequest.BodyPublishers.ofString(requestBody)).build();HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());if (response.statusCode() >= 200 && response.statusCode() < 300) {loadBalancer.onRequestSuccess(node);return response.body();} else {loadBalancer.onRequestFail(node);throw new RuntimeException("Service call failed with status: " + response.statusCode());}} catch (Exception e) {if (node != null) {loadBalancer.onRequestFail(node);}throw e;}}
}
三、在Spring Boot中的使用
1.配置为Spring Bean
@Configuration
public class LoadBalancerConfiguration {@Beanpublic List<ServiceNode> serviceNodes() {return Arrays.asList(new ServiceNode("192.168.1.101", 8080, 1),new ServiceNode("192.168.1.102", 8080, 1),new ServiceNode("192.168.1.103", 8080, 1),new ServiceNode("192.168.1.104", 8080, 1));}@Beanpublic LoadBalancer loadBalancer() {return new WeightedRoundRobinLoadBalancer();}@Beanpublic HealthChecker healthChecker(List<ServiceNode> nodes) {HealthChecker checker = new HealthChecker(nodes);checker.start();return checker;}@Beanpublic ExternalServiceInvoker externalServiceInvoker(LoadBalancer loadBalancer, List<ServiceNode> nodes) {return new ExternalServiceInvoker(loadBalancer, nodes);}
}
2.在Controller中使用
@RestController
@RequestMapping("/api")
public class MyController {@Autowiredprivate ExternalServiceInvoker serviceInvoker;@PostMapping("/call-external")public ResponseEntity<String> callExternalService(@RequestBody String request) {try {String response = serviceInvoker.invokeService("/external-api", request);return ResponseEntity.ok(response);} catch (Exception e) {return ResponseEntity.status(500).body("Service call failed: " + e.getMessage());}}
}
四、总结与最佳实践
-
选择合适的负载均衡策略:
- 对于性能相近的节点,使用轮询或随机
- 对于性能差异大的节点,使用加权轮询
- 对于长连接服务,考虑最少连接算法
-
健康检查配置建议:
- 检查间隔:5-30秒
- 超时时间:1-3秒
- 检查端点:/health或/actuator/health
-
生产环境注意事项:
- 添加日志记录节点选择过程和健康状态变化
- 实现优雅降级,当所有节点不可用时返回缓存数据或友好错误
- 考虑使用分布式配置中心动态调整节点列表
-
性能优化:
- 使用连接池减少连接建立开销
- 实现请求重试机制(带退避策略)
- 添加熔断器模式防止级联故障
-
结合熔断机制
- 完整的熔断机制:基于失败计数和超时的自动熔断/恢复
- 三种状态:
- 关闭(CLOSED):正常状态
- 打开(OPEN):熔断状态,拒绝请求
- 半开(HALF-OPEN):尝试恢复状态
- 与负载均衡深度集成:
- 节点选择时自动跳过熔断中的节点
- 健康检查会重置成功节点的熔断状态
- 支持半开状态下的试探性请求
- 可视化监控:通过REST端点查看节点和熔断状态