当前位置: 首页 > news >正文

具有熔断能力和活性探测的服务负载均衡解决方案

一、整体架构设计

1.核心组件
  • 负载均衡器:负责选择可用的服务节点
  • 健康检查器:定期检测服务节点的可用性
  • 服务节点管理:维护所有可用节点的状态信息
2.负载均衡策略
  • 轮询(Round Robin)
  • 随机(Random)
  • 加权轮询(Weighted Round Robin)
  • 最少连接(Least Connection)
  • 一致性哈希(Consistent Hash)

二、完整代码

1.定义节点服务类
public class ServiceNode {private String ip;private int port;private boolean active;private int weight;private int currentConnections;private long lastActiveTime;// 熔断相关属性private int failureCount;private long circuitOpenedTime;private static final int FAILURE_THRESHOLD = 3;private static final long CIRCUIT_BREAKER_TIMEOUT = 30000; // 30秒// 判断节点是否可用(考虑熔断状态)public boolean isAvailable() {if (failureCount >= FAILURE_THRESHOLD) {// 检查是否应该尝试恢复if (System.currentTimeMillis() - circuitOpenedTime > CIRCUIT_BREAKER_TIMEOUT) 			  {return true; // 进入半开状态}return false; // 熔断中}return active; // 正常状态}public void recordFailure() {failureCount++;if (failureCount >= FAILURE_THRESHOLD) {circuitOpenedTime = System.currentTimeMillis();}}public void recordSuccess() {failureCount = 0;circuitOpenedTime = 0;}// 其他getter/setter方法...
}
2.负载均衡接口
public interface LoadBalancer {ServiceNode selectNode(List<ServiceNode> nodes);default void onRequestSuccess(ServiceNode node) {node.setLastActiveTime(System.currentTimeMillis());node.setCurrentConnections(node.getCurrentConnections() - 1);}default void onRequestFail(ServiceNode node) {node.setActive(false);}
}
3.实现不同的负载均衡策略
轮询策略,考虑熔断状态
public class RoundRobinLoadBalancer implements LoadBalancer {private final AtomicInteger counter = new AtomicInteger(0);@Overridepublic ServiceNode selectNode(List<ServiceNode> nodes) {List<ServiceNode> availableNodes = nodes.stream().filter(ServiceNode::isAvailable).collect(Collectors.toList());if (availableNodes.isEmpty()) {// 尝试从不可用节点中选择已经过了熔断超时的节点List<ServiceNode> halfOpenNodes = nodes.stream().filter(n -> !n.isHealthy() && n.getFailureCount() >= ServiceNode.FAILURE_THRESHOLD &&System.currentTimeMillis() - n.getCircuitOpenedTime() > ServiceNode.CIRCUIT_BREAKER_TIMEOUT).collect(Collectors.toList());if (!halfOpenNodes.isEmpty()) {// 选择其中一个进入半开状态的节点int index = counter.getAndIncrement() % halfOpenNodes.size();ServiceNode selected = halfOpenNodes.get(index);selected.setCurrentConnections(selected.getCurrentConnections() + 1);return selected;}throw new RuntimeException("No available nodes");}int index = counter.getAndIncrement() % availableNodes.size();ServiceNode selected = availableNodes.get(index);selected.setCurrentConnections(selected.getCurrentConnections() + 1);return selected;}@Overridepublic void onRequestSuccess(ServiceNode node) {node.recordSuccess();node.setLastActiveTime(System.currentTimeMillis());node.setCurrentConnections(node.getCurrentConnections() - 1);}@Overridepublic void onRequestFail(ServiceNode node) {node.recordFailure();node.setActive(false);node.setCurrentConnections(node.getCurrentConnections() - 1);}
}
加权轮询策略
public class WeightedRoundRobinLoadBalancer implements LoadBalancer {private final AtomicInteger counter = new AtomicInteger(0);@Overridepublic ServiceNode selectNode(List<ServiceNode> nodes) {List<ServiceNode> activeNodes = nodes.stream().filter(ServiceNode::isHealthy).collect(Collectors.toList());if (activeNodes.isEmpty()) {throw new RuntimeException("No available nodes");}int totalWeight = activeNodes.stream().mapToInt(ServiceNode::getWeight).sum();int current = counter.getAndIncrement() % totalWeight;for (ServiceNode node : activeNodes) {if (current < node.getWeight()) {node.setCurrentConnections(node.getCurrentConnections() + 1);return node;}current -= node.getWeight();}// fallback to simple round robinint index = counter.get() % activeNodes.size();ServiceNode selected = activeNodes.get(index);selected.setCurrentConnections(selected.getCurrentConnections() + 1);return selected;}
}
4.健康检查器
public class HealthChecker {private final List<ServiceNode> nodes;private final ScheduledExecutorService scheduler;private final HttpClient httpClient;public HealthChecker(List<ServiceNode> nodes) {this.nodes = nodes;this.scheduler = Executors.newSingleThreadScheduledExecutor();this.httpClient = HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1).connectTimeout(Duration.ofSeconds(2)).build();}public void start() {scheduler.scheduleAtFixedRate(this::checkAllNodes, 0, 10, TimeUnit.SECONDS);}public void stop() {scheduler.shutdown();}private void checkAllNodes() {nodes.parallelStream().forEach(this::checkNode);}private void checkNode(ServiceNode node) {// 如果节点处于熔断状态但未超时,不检查if (node.getFailureCount() >= ServiceNode.FAILURE_THRESHOLD && System.currentTimeMillis() - node.getCircuitOpenedTime() < ServiceNode.CIRCUIT_BREAKER_TIMEOUT) {return;}try {HttpRequest request = HttpRequest.newBuilder().uri(URI.create("http://" + node.getIp() + ":" + node.getPort() + "/health")).timeout(Duration.ofSeconds(2)).GET().build();HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());if (response.statusCode() == 200) {node.setActive(true);node.recordSuccess(); // 重置熔断状态node.setLastActiveTime(System.currentTimeMillis());} else {node.recordFailure();node.setActive(false);}} catch (Exception e) {node.recordFailure();node.setActive(false);}}
}
5.服务调用封装
public class ExternalServiceInvoker {private final LoadBalancer loadBalancer;private final List<ServiceNode> nodes;private final HttpClient httpClient;public ExternalServiceInvoker(LoadBalancer loadBalancer, List<ServiceNode> nodes) {this.loadBalancer = loadBalancer;this.nodes = nodes;this.httpClient = HttpClient.newHttpClient();}public String invokeService(String path, String requestBody) throws Exception {ServiceNode node = null;try {node = loadBalancer.selectNode(nodes);HttpRequest request = HttpRequest.newBuilder().uri(URI.create("http://" + node.getIp() + ":" + node.getPort() + path)).header("Content-Type", "application/json").timeout(Duration.ofSeconds(5)).POST(HttpRequest.BodyPublishers.ofString(requestBody)).build();HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());if (response.statusCode() >= 200 && response.statusCode() < 300) {loadBalancer.onRequestSuccess(node);return response.body();} else {loadBalancer.onRequestFail(node);throw new RuntimeException("Service call failed with status: " + response.statusCode());}} catch (Exception e) {if (node != null) {loadBalancer.onRequestFail(node);}throw e;}}
}

三、在Spring Boot中的使用

1.配置为Spring Bean
@Configuration
public class LoadBalancerConfiguration {@Beanpublic List<ServiceNode> serviceNodes() {return Arrays.asList(new ServiceNode("192.168.1.101", 8080, 1),new ServiceNode("192.168.1.102", 8080, 1),new ServiceNode("192.168.1.103", 8080, 1),new ServiceNode("192.168.1.104", 8080, 1));}@Beanpublic LoadBalancer loadBalancer() {return new WeightedRoundRobinLoadBalancer();}@Beanpublic HealthChecker healthChecker(List<ServiceNode> nodes) {HealthChecker checker = new HealthChecker(nodes);checker.start();return checker;}@Beanpublic ExternalServiceInvoker externalServiceInvoker(LoadBalancer loadBalancer, List<ServiceNode> nodes) {return new ExternalServiceInvoker(loadBalancer, nodes);}
}
2.在Controller中使用
@RestController
@RequestMapping("/api")
public class MyController {@Autowiredprivate ExternalServiceInvoker serviceInvoker;@PostMapping("/call-external")public ResponseEntity<String> callExternalService(@RequestBody String request) {try {String response = serviceInvoker.invokeService("/external-api", request);return ResponseEntity.ok(response);} catch (Exception e) {return ResponseEntity.status(500).body("Service call failed: " + e.getMessage());}}
}

四、总结与最佳实践

  1. 选择合适的负载均衡策略

    • 对于性能相近的节点,使用轮询或随机
    • 对于性能差异大的节点,使用加权轮询
    • 对于长连接服务,考虑最少连接算法
  2. 健康检查配置建议

    • 检查间隔:5-30秒
    • 超时时间:1-3秒
    • 检查端点:/health或/actuator/health
  3. 生产环境注意事项

    • 添加日志记录节点选择过程和健康状态变化
    • 实现优雅降级,当所有节点不可用时返回缓存数据或友好错误
    • 考虑使用分布式配置中心动态调整节点列表
  4. 性能优化

    • 使用连接池减少连接建立开销
    • 实现请求重试机制(带退避策略)
    • 添加熔断器模式防止级联故障
  5. 结合熔断机制

    1. 完整的熔断机制:基于失败计数和超时的自动熔断/恢复
    2. 三种状态
      • 关闭(CLOSED):正常状态
      • 打开(OPEN):熔断状态,拒绝请求
      • 半开(HALF-OPEN):尝试恢复状态
    3. 与负载均衡深度集成
      • 节点选择时自动跳过熔断中的节点
      • 健康检查会重置成功节点的熔断状态
      • 支持半开状态下的试探性请求
    4. 可视化监控:通过REST端点查看节点和熔断状态
http://www.lryc.cn/news/620175.html

相关文章:

  • Linux编程 IO(标准io,文件io,目录io)
  • 机器学习⑤【线性回归(Linear Regression】
  • springboot接口请求参数校验
  • web开发,在线%射击比赛管理%系统开发demo,基于html,css,jquery,python,django,三层mysql数据库
  • 锂电池自动化生产线:智能制造重塑能源产业格局
  • 【完整源码+数据集+部署教程】医学报告图像分割系统源码和数据集:改进yolo11-HGNetV2
  • 深度学习与遥感入门(七)|CNN vs CNN+形态学属性(MP):特征工程到底值不值?
  • 【工具】雀语queyu文件批量下载 文档内容复刻导出
  • Socket 套接字的学习--UDP
  • wps--设置
  • 大模型推理框架vLLM 中的Prompt缓存实现原理
  • GaussDB 权限管理的系统性技术解析与实践指南
  • Windows 系统 上尝试直接运行 .sh(Shell 脚本)文件
  • OpenFeign 服务调用原理与源码分析
  • Endnote 20超详细入门教程(实现参考论文的插入)
  • 类和对象(中下)
  • ARM芯片架构之CoreSight Channel Interface 介绍
  • 机器学习-Cluster
  • 机器学习——svm支持向量机
  • mac下载maven并配置,以及idea配置
  • O2OA:数字化转型中安全与效率的平衡之道
  • 深入理解 uni-app 的 uni.createSelectorQuery()
  • 云电竞游戏盒子技术分析
  • BAW56LT1G ON安森美 双串联开关二极管 电子元器件
  • Linux系统文件完整性检查工具AIDE在生产环境中推送钉钉告警
  • Nginx Stream代理绕过网络隔离策略
  • 雨量系列篇一:翻斗雨量传感器与压电雨量传感器的区别是什么
  • 古诗词多媒体内容生成工作流文档操作流程
  • 杂记 01
  • reactive和ref使用方法及场景