Spring Cloud Alibaba Sentinel 源码阅读之流量控制算法
流量统计数据结构
几个重要的类
每一个资源的访问统计都被封装成一个StatisticNode用来记录各项指标
class StatisticNode{Metric rollingCounterInSecond = new ArrayMetric(2,1000);
}
每个StatisticNode中有一个ArrayMetric负责具体的数据统计
class ArrayMetric{LeapArray<MetricBucket> data = new OccupiableBucketLeapArray(2, 1000);
}
LeapArray
负责维护一个循环数组,每个元素代表一个时间窗口的数据
class LeapArray{protected int windowLengthInMs;//单个窗口的长度(毫秒)protected int sampleCount;// 窗口数量protected int intervalInMs;//总的统计时长 = windowLengthInMs * sampleCountprivate double intervalInSecond;protected final AtomicReferenceArray<WindowWrap<T>> array;public LeapArray(int sampleCount, int intervalInMs) {this.windowLengthInMs = intervalInMs / sampleCount;this.intervalInMs = intervalInMs;this.intervalInSecond = intervalInMs / 1000.0;this.sampleCount = sampleCount;this.array = new AtomicReferenceArray<>(sampleCount);}
}
WindowWrap: 是 LeapArray
数组中的一个元素,它包装了一个 MetricBucket
以及该窗口的开始时间戳。
public class WindowWrap<T> {private final long windowStart; // 窗口的开始时间private final int windowLength; // 窗口的长度private volatile T value; // 实际存储的 MetricBucket
}
MetricBucket: 存储在一个时间窗口内(例如 200ms)的各项指标,如通过请求数、拒绝请求数、异常数等。它内部使用 LongAdder
来保证并发安全地更新计数。
StatisticSlot流量统计
看完了数据结构,继续来看计数流程。开始位置还要冲slot的entry方法
StatisticSlot#entry()
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {try {//调用slot链fireEntry(context, resourceWrapper, node, count, prioritized, args);// 请求通过,增加线程数和请求通过数node.increaseThreadNum();node.addPassRequest(count);...} catch (PriorityWaitException ex) {node.increaseThreadNum();...} catch (BlockException e) {// Blocked, set block exception to current entry.context.getCurEntry().setBlockError(e);// 添加阻塞数node.increaseBlockQps(count);...throw e;} catch (Throwable e) {...}}
这里不看其他的,就看node.addPassRequest(count);这一步请求通过计数。
这里node继承自StatisticNode会掉到其addPassRequest方法
StatisticNode#addPassRequest()
public void addPassRequest(int count) {rollingCounterInSecond.addPass(count);rollingCounterInMinute.addPass(count);}
根据上面的数据结构,rollingCounterInSecond是ArrayMetric实例,
ArrayMetric#addPass
public void addPass(int count) {WindowWrap<MetricBucket> wrap = data.currentWindow();wrap.value().addPass(count);
}
这里data实例是LeapArray类型,currentWindow()方法获取当前实际窗口数据对象
LeapArray#currentWindow()
public WindowWrap<T> currentWindow() {return currentWindow(TimeUtil.currentTimeMillis());
}
public WindowWrap<T> currentWindow(long timeMillis) {if (timeMillis < 0) {return null;}//根据时间戳获取当前时间对应的窗口下标int idx = calculateTimeIdx(timeMillis);// 根据时间戳获取当前时间对应的窗口开始时间long windowStart = calculateWindowStart(timeMillis);/*从数组中获取指定时间点的桶(bucket)项。1、如果桶不存在,则创建一个新的桶,并使用 CAS(比较并交换)更新到环形数组中。2、如果桶匹配当前开始时间,直接返回该桶。3、如果桶已过期,则重置当前桶,并清理所有过期的桶。*/while (true) {WindowWrap<T> old = array.get(idx);if (old == null) {WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));if (array.compareAndSet(idx, null, window)) {// Successfully updated, return the created bucket.return window;} else {// Contention failed, the thread will yield its time slice to wait for bucket available.Thread.yield();}} else if (windowStart == old.windowStart()) {return old;} else if (windowStart > old.windowStart()) {if (updateLock.tryLock()) {try {// Successfully get the update lock, now we reset the bucket.return resetWindowTo(old, windowStart);} finally {updateLock.unlock();}} else {Thread.yield();}} else if (windowStart < old.windowStart()) {return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));}}}
这里有两个重要的方法calculateTimeIdx和calculateWindowStart
private int calculateTimeIdx(/*@Valid*/ long timeMillis) {//当前时间除以一个时间窗口的长度long timeId = timeMillis / windowLengthInMs;// Calculate current index so we can map the timestamp to the leap array.return (int)(timeId % array.length());}
这样时间每增加一个时间窗口长度,数值下标会往前推进1。
计数窗口开始时间,这个也好理解,其实就是求上个时间窗口的结束时间,
protected long calculateWindowStart(/*@Valid*/ long timeMillis) {return timeMillis - timeMillis % windowLengthInMs;
}
拿到时间窗口buck后WindowWrap<MetricBucket>
调用MetricBucket.addPass(count)增加计数。
实际情况分析:
sentinel默认的滑动窗口周期是1000毫秒,样本数是2。这样一个窗口的时间长度是 1000/2 = 500毫秒,LeapArray数组长度为2,存储两个样本窗口数据。
所属窗口编号 = (当前时间/500)%2
窗口开始时间= (当前时间 - 当前时间%500)
不同时间请求对应窗口信息:
请求编号 | 请求时间 | 所属窗口编号 | 窗口开始时间 |
---|---|---|---|
1 | 0 | 0 | 0 |
2 | 200 | 0 | 0 |
3 | 300 | 0 | 0 |
4 | 600 | 1 | 500 |
5 | 800 | 1 | 500 |
6 | 1100 | 0 | 1000 |
7 | 1600 | 1 | 1500 |
… |
这里看到LeapArray数组中两个元素会被循环使用,过去的窗口数据会被清空覆盖掉。
FlowSlot流量控制
数据统计好了,下一步就是根据流量数据统计进行控制
FlowSlot#entry()
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {// 校验限流规则checkFlow(resourceWrapper, context, node, count, prioritized);//调用下一个slotfireEntry(context, resourceWrapper, node, count, prioritized, args);}
限流规则校验主要在FlowRuleChecker中完成。checkFlow()校验首先根据当前资源从FlowRuleManager获取适配当前资源的限流规则,然后逐一进行规则校验。
FlowRuleChecker#checkFlow
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {if (ruleProvider == null || resource == null) {return;}//当前资源上配置的限流规则Collection<FlowRule> rules = ruleProvider.apply(resource.getName());if (rules != null) {//逐一校验for (FlowRule rule : rules) {if (!canPassCheck(rule, context, node, count, prioritized)) {throw new FlowException(rule.getLimitApp(), rule);}}}}
每一个FlowRule有一个对应的TrafficShapingController来执行判断。具体判断在canPass()方法中实现。
默认的策略是直接拒绝,DefaultController。其他的还有RateLimiterController
, WarmUpController
。
DefaultController#canPass()
public boolean canPass(Node node, int acquireCount, boolean prioritized) {//获取当前节点的请求数int curCount = avgUsedTokens(node);//count是阈值,如果大于阈值直接返回falseif (curCount + acquireCount > count) {if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {....}return false;}return true;}
private int avgUsedTokens(Node node) {if (node == null) {return DEFAULT_AVG_USED_TOKENS;}//计数所有有效的pass数量return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());}
流程算法总结
当一个请求进入 Sentinel 的流量统计 StatisticSlot
时:
- 获取当前时间。
- 通过
LeapArray.currentWindow()
方法获取当前时间对应的WindowWrap<MetricBucket>
。 - 如果当前窗口是新窗口或过期窗口,会被重置。
- 在获取到的
MetricBucket
上,调用pass.increment()
等方法,增加对应的指标计数。 - 在进入到FlowSlot进行限流判断时,会通过
LeapArray.values(currentTime)
获取所有有效(未过期)的MetricBucket
,然后遍历这些MetricBucket
,累加它们的pass
计数,从而得到在滑动窗口内的总 QPS。