当前位置: 首页 > news >正文

Spring Cloud Alibaba Sentinel 源码阅读之流量控制算法

流量统计数据结构

几个重要的类

每一个资源的访问统计都被封装成一个StatisticNode用来记录各项指标

class StatisticNode{Metric rollingCounterInSecond = new ArrayMetric(2,1000);
}

每个StatisticNode中有一个ArrayMetric负责具体的数据统计

class ArrayMetric{LeapArray<MetricBucket> data = new OccupiableBucketLeapArray(2, 1000);
}

LeapArray 负责维护一个循环数组,每个元素代表一个时间窗口的数据

class LeapArray{protected int windowLengthInMs;//单个窗口的长度(毫秒)protected int sampleCount;// 窗口数量protected int intervalInMs;//总的统计时长 = windowLengthInMs * sampleCountprivate double intervalInSecond;protected final AtomicReferenceArray<WindowWrap<T>> array;public LeapArray(int sampleCount, int intervalInMs) {this.windowLengthInMs = intervalInMs / sampleCount;this.intervalInMs = intervalInMs;this.intervalInSecond = intervalInMs / 1000.0;this.sampleCount = sampleCount;this.array = new AtomicReferenceArray<>(sampleCount);}
}

WindowWrap: 是 LeapArray 数组中的一个元素,它包装了一个 MetricBucket 以及该窗口的开始时间戳。

public class WindowWrap<T> {private final long windowStart; // 窗口的开始时间private final int windowLength; // 窗口的长度private volatile T value;       // 实际存储的 MetricBucket
}

MetricBucket: 存储在一个时间窗口内(例如 200ms)的各项指标,如通过请求数、拒绝请求数、异常数等。它内部使用 LongAdder 来保证并发安全地更新计数。

StatisticSlot流量统计

看完了数据结构,继续来看计数流程。开始位置还要冲slot的entry方法

StatisticSlot#entry()

    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {try {//调用slot链fireEntry(context, resourceWrapper, node, count, prioritized, args);// 请求通过,增加线程数和请求通过数node.increaseThreadNum();node.addPassRequest(count);...} catch (PriorityWaitException ex) {node.increaseThreadNum();...} catch (BlockException e) {// Blocked, set block exception to current entry.context.getCurEntry().setBlockError(e);// 添加阻塞数node.increaseBlockQps(count);...throw e;} catch (Throwable e) {...}}

这里不看其他的,就看node.addPassRequest(count);这一步请求通过计数。

这里node继承自StatisticNode会掉到其addPassRequest方法

StatisticNode#addPassRequest()

public void addPassRequest(int count) {rollingCounterInSecond.addPass(count);rollingCounterInMinute.addPass(count);}

根据上面的数据结构,rollingCounterInSecond是ArrayMetric实例,

ArrayMetric#addPass

public void addPass(int count) {WindowWrap<MetricBucket> wrap = data.currentWindow();wrap.value().addPass(count);
}

这里data实例是LeapArray类型,currentWindow()方法获取当前实际窗口数据对象

LeapArray#currentWindow()

public WindowWrap<T> currentWindow() {return currentWindow(TimeUtil.currentTimeMillis());
}
public WindowWrap<T> currentWindow(long timeMillis) {if (timeMillis < 0) {return null;}//根据时间戳获取当前时间对应的窗口下标int idx = calculateTimeIdx(timeMillis);// 根据时间戳获取当前时间对应的窗口开始时间long windowStart = calculateWindowStart(timeMillis);/*从数组中获取指定时间点的桶(bucket)项。1、如果桶不存在,则创建一个新的桶,并使用 CAS(比较并交换)更新到环形数组中。2、如果桶匹配当前开始时间,直接返回该桶。3、如果桶已过期,则重置当前桶,并清理所有过期的桶。*/while (true) {WindowWrap<T> old = array.get(idx);if (old == null) {WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));if (array.compareAndSet(idx, null, window)) {// Successfully updated, return the created bucket.return window;} else {// Contention failed, the thread will yield its time slice to wait for bucket available.Thread.yield();}} else if (windowStart == old.windowStart()) {return old;} else if (windowStart > old.windowStart()) {if (updateLock.tryLock()) {try {// Successfully get the update lock, now we reset the bucket.return resetWindowTo(old, windowStart);} finally {updateLock.unlock();}} else {Thread.yield();}} else if (windowStart < old.windowStart()) {return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));}}}

这里有两个重要的方法calculateTimeIdx和calculateWindowStart

    private int calculateTimeIdx(/*@Valid*/ long timeMillis) {//当前时间除以一个时间窗口的长度long timeId = timeMillis / windowLengthInMs;// Calculate current index so we can map the timestamp to the leap array.return (int)(timeId % array.length());}

这样时间每增加一个时间窗口长度,数值下标会往前推进1。

计数窗口开始时间,这个也好理解,其实就是求上个时间窗口的结束时间,

protected long calculateWindowStart(/*@Valid*/ long timeMillis) {return timeMillis - timeMillis % windowLengthInMs;
}

拿到时间窗口buck后WindowWrap<MetricBucket> 调用MetricBucket.addPass(count)增加计数。

实际情况分析:

sentinel默认的滑动窗口周期是1000毫秒,样本数是2。这样一个窗口的时间长度是 1000/2 = 500毫秒,LeapArray数组长度为2,存储两个样本窗口数据。

所属窗口编号 = (当前时间/500)%2

窗口开始时间= (当前时间 - 当前时间%500)

不同时间请求对应窗口信息:

请求编号请求时间所属窗口编号窗口开始时间
1000
220000
330000
46001500
58001500
6110001000
7160011500

这里看到LeapArray数组中两个元素会被循环使用,过去的窗口数据会被清空覆盖掉。

FlowSlot流量控制

数据统计好了,下一步就是根据流量数据统计进行控制

FlowSlot#entry()

    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {// 校验限流规则checkFlow(resourceWrapper, context, node, count, prioritized);//调用下一个slotfireEntry(context, resourceWrapper, node, count, prioritized, args);}

限流规则校验主要在FlowRuleChecker中完成。checkFlow()校验首先根据当前资源从FlowRuleManager获取适配当前资源的限流规则,然后逐一进行规则校验。

FlowRuleChecker#checkFlow

    public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {if (ruleProvider == null || resource == null) {return;}//当前资源上配置的限流规则Collection<FlowRule> rules = ruleProvider.apply(resource.getName());if (rules != null) {//逐一校验for (FlowRule rule : rules) {if (!canPassCheck(rule, context, node, count, prioritized)) {throw new FlowException(rule.getLimitApp(), rule);}}}}

每一个FlowRule有一个对应的TrafficShapingController来执行判断。具体判断在canPass()方法中实现。

默认的策略是直接拒绝,DefaultController。其他的还有RateLimiterController, WarmUpController

DefaultController#canPass()

    public boolean canPass(Node node, int acquireCount, boolean prioritized) {//获取当前节点的请求数int curCount = avgUsedTokens(node);//count是阈值,如果大于阈值直接返回falseif (curCount + acquireCount > count) {if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {....}return false;}return true;}
private int avgUsedTokens(Node node) {if (node == null) {return DEFAULT_AVG_USED_TOKENS;}//计数所有有效的pass数量return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());}

流程算法总结

当一个请求进入 Sentinel 的流量统计 StatisticSlot 时:

  1. 获取当前时间。
  2. 通过 LeapArray.currentWindow() 方法获取当前时间对应的 WindowWrap<MetricBucket>
  3. 如果当前窗口是新窗口或过期窗口,会被重置。
  4. 在获取到的 MetricBucket 上,调用 pass.increment() 等方法,增加对应的指标计数。
  5. 在进入到FlowSlot进行限流判断时,会通过 LeapArray.values(currentTime) 获取所有有效(未过期)的 MetricBucket,然后遍历这些 MetricBucket,累加它们的 pass 计数,从而得到在滑动窗口内的总 QPS。
http://www.lryc.cn/news/598859.html

相关文章:

  • PCL 间接平差拟合球
  • Spring MVC 统一响应格式:ResponseBodyAdvice 从浅入深
  • 论文阅读:《针对多目标优化和应用的 NSGA-II 综述》一些关于优化算法的简介
  • 7.24 C/C++蓝桥杯 | 排序算法
  • 面试题(技术面+hr面)
  • Sklearn 机器学习 数值标准化
  • C++高效实现轨迹规划、自动泊车、RTS游戏、战术迂回包抄、空中轨迹、手术机器人、KD树
  • JSONObject相关知识点
  • 【MediaTek】AN7563编译出现npu/en7563/host/Makefile: No such file or directory
  • Silly Tavern 教程②:首次启动与基础设置
  • Windows 如何更改 ModelScope 的模型下载缓存位置?
  • 循环神经网络--LSTM模型
  • 跨境支付入门~国际支付结算(区块链篇)
  • 推荐系统如何开发
  • AI大模型资源
  • Spring Boot 遇上 MyBatis-Plus:高效开发的奇妙之旅
  • 10_Spring Boot 中的 @Scheduled 注解是单线程还是多线程?同步还是异步?
  • Percona pt-archiver 出现长事务
  • IntelliJ IDEA
  • 单片机的第一个程序—LED灯的控制
  • HBase + PostgreSQL + ElasticSearch 联合查询方案
  • 斐波那契数列策略
  • 新能源电池厂自动化应用:Modbus TCP转DeviceNet实践
  • Opencv C# 重叠 粘连 Overlap 轮廓分割 (不知道不知道)
  • C语言(长期更新)第5讲:数组练习(三)
  • windows11通过wsl安装Ubuntu到D盘,安装docker及宝塔面板
  • 【物联网】基于树莓派的物联网开发【16】——树莓派GPIO控制LED灯实验
  • 卫星物联网:使用兼容 Arduino 的全新 Iridium Certus 9704 开发套件深入探索
  • MSOP/DIFOP端口 vs. IP地址的关系以及每个IP下面有什么自己的东西
  • JavaSE:对一门面向对象语言有一个初步认识