限流算法详解
限流是我们经常会碰到的东西,顾名思义就是限制流量。它能保证我们的系统不会被突然的流量打爆,保证系统的稳定运行。像我们生活中,地铁就会有很多护栏,弯弯绕绕的,这个就是一种限流。像我们抢茅台,肯定大部分流量也是会被限流的,你的请求可能根本没进到下单等环节就被拦截了。
计数限流
最简单的限流算法就是计数限流了,例如系统能同时处理 100 个请求,保存一个计数器,处理了一个请求,计数器加一,一个请求处理完毕之后计数器减一。
每次请求来的时候看看计数器的值,如果超过阈值要么拒绝。
非常的简单粗暴,计数器的值要是存内存中就算单机限流算法。存中心存储里,例如 Redis 中,集群机器访问就算分布式限流算法。
优点就是:简单粗暴,单机在 Java 中可用 Atomic 等原子类、分布式就 Redis incr。
缺点就是:只有数量,没有时间的概念。一秒处理100个请求,和一分钟处理100个请求是不一样的。
固定窗口限流
它相比于计数限流主要是多了个时间窗口的概念。计数器每过一个时间窗口就重置。 规则是
在时间窗口内,累加访问次数,这个时间窗口过了之后,计数器清零;
缺点就是在临界时间如果出现突发流量的情况,会超过阈值。
——|——|——
0 1 2
滑动窗口限流
滑动窗口限流解决固定窗口临界值的问题,可以保证在任意时间窗口内都不会超过阈值。
相对于固定窗口,滑动窗口除了需要引入计数器之外还需要记录时间窗口内每个请求到达的时间点,因此对内存的占用会比较多。
规则如下,假设时间窗口为 1 秒:
-
记录每次请求的时间
-
统计每次请求的时间 至 往前推 1 秒这个时间窗口内请求数,并且 1 秒前的数据可以删除。
-
统计的请求数小于阈值就记录这个请求的时间,并允许通过,反之拒绝。
但是滑动窗口和固定窗口都无法解决短时间之内集中流量的突击。
我们所想的限流场景,例如每秒限制 100 个请求。希望请求每 10ms 来一个,这样我们的流量处理就很平滑,但是真实场景很难控制请求的频率。因此可能存在 5ms 内就打满了阈值的情况。
当然对于这种情况还是有变型处理的,例如设置多条限流规则。不仅限制每秒 100 个请求,再设置每 10ms 不超过 2 个。
漏桶算法
如下图所示,水滴持续滴入漏桶中,底部定速流出。如果水滴滴入的速率大于流出的速率,当存水超过桶的大小的时候就会溢出。
规则如下:
-
请求来了放入桶中
-
桶内请求量满了拒绝请求
-
服务定速从桶内拿请求处理
优点既是缺点,流量处理太过平滑。面对突发请求,服务的处理速度和平时是一样的,这其实不是我们想要的,在面对突发流量我们希望在系统平稳的同时,提升用户体验即能更快的处理请求,而不是和正常流量一样,循规蹈矩的处理
令牌桶算法
令牌桶其实和漏桶的原理类似,只不过漏桶是定速地流出,而令牌桶是定速地往桶里塞入令牌,然后请求只有拿到了令牌才能通过,之后再被服务器处理。
当然令牌桶的大小也是有限制的,假设桶里的令牌满了之后,定速生成的令牌会丢弃。
规则:
-
定速的往桶内放入令牌
-
令牌数量超过桶的限制,丢弃
-
请求来了先向桶内索要令牌,索要成功则通过被处理,反之拒绝
一般而言我们不需要自己实现限流算法来达到限流的目的,不管是接入层限流还是细粒度的接口限流其实都有现成的轮子使用,其实现也是用了上述我们所说的限流算法。
比如Google Guava
提供的限流工具类 RateLimiter
,是基于令牌桶实现的,并且扩展了算法,支持预热功能。
阿里开源的限流框架 Sentinel
中的匀速排队限流策略,就采用了滑动窗口。
Nginx 中的限流模块 limit_req_zone
,采用了漏桶算法,还有 OpenResty 中的 resty.limit.req
库等等。
Guava如何实现令牌桶算法
根据令牌桶的原理,我们可能第一直觉会觉得使用生产者-消费者模式,一个线程定时向阻塞队列添加令牌,而请求作为消费线程去获取令牌。如果并发量不大的情况,这个实现没有什么问题。但一般情况下,使用限流都是高并发的场景,而且系统压力已经临界极限了,这个时候cpu忙碌,放令牌的线程可能没法被及时唤醒,造成放令牌延迟,同时定时器会创建调度线程,也会对系统性能产生影响。
所以Guava没有使用定时线程。它的办法很简单,就是通过经过的时间来判断有多少令牌产生,并保存下目前的令牌数。
举个例子,假设限制1s一个请求。启动之后,刚开始没有请求,到10秒的时候来了个请求。因为过了10s,所以就可以判断有10个令牌产生,然后消耗一个,剩9个令牌,记录下来。又过了10s再来了一个请求。那么又有10个令牌产生,消耗一个,还剩18个令牌。
通过这种简单的方式,就可以实现令牌桶了。
public double acquire(int permits) {long microsToWait = reserve(permits); // sleep的时间stopwatch.sleepMicrosUninterruptibly(microsToWait);return 1.0 * microsToWait / SECONDS.toMicros(1L);}
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {resync(nowMicros);long returnValue = nextFreeTicketMicros;// 需要消耗储存的令牌的数量double storedPermitsToSpend = min(requiredPermits, this.storedPermits);// 需要等待新创建的令牌的数量double freshPermits = requiredPermits - storedPermitsToSpend;// 需要等待的时间long waitMicros =storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)+ (long) (freshPermits * stableIntervalMicros);// 下次令牌生成的时间this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);this.storedPermits -= storedPermitsToSpend;return returnValue;
/*** Updates {@code storedPermits} and {@code nextFreeTicketMicros} based on the current time.*/void resync(long nowMicros) {// if nextFreeTicket is in the past, resync to nowif (nowMicros > nextFreeTicketMicros) {double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();// 存储新生成的令牌storedPermits = min(maxPermits, storedPermits + newPermits);nextFreeTicketMicros = nowMicros;}}
sentinel滑动窗口算法
@Overridepublic void addPass(int count) {WindowWrap<MetricBucket> wrap = data.currentWindow();wrap.value().addPass(count);}
public WindowWrap<T> currentWindow(long timeMillis) {if (timeMillis < 0) {return null;}int idx = calculateTimeIdx(timeMillis);// Calculate current bucket start time.long windowStart = calculateWindowStart(timeMillis);/** Get bucket item at given time from the array.** (1) Bucket is absent, then just create a new bucket and CAS update to circular array.* (2) Bucket is up-to-date, then just return the bucket.* (3) Bucket is deprecated, then reset current bucket.*/while (true) {WindowWrap<T> old = array.get(idx);if (old == null) {WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));if (array.compareAndSet(idx, null, window)) {// Successfully updated, return the created bucket.return window;} else {// Contention failed, the thread will yield its time slice to wait for bucket available.Thread.yield();}} else if (windowStart == old.windowStart()) {return old;} else if (windowStart > old.windowStart()) {if (updateLock.tryLock()) {try {// Successfully get the update lock, now we reset the bucket.return resetWindowTo(old, windowStart);} finally {updateLock.unlock();}} else {// Contention failed, the thread will yield its time slice to wait for bucket available.Thread.yield();}} else if (windowStart < old.windowStart()) {// Should not go through here, as the provided time is already behind.return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));}}}