当前位置: 首页 > news >正文

Java Resilience4j-RateLimiter学习

一. 介绍

Resilience4j-RateLimiter 是 Resilience4j 中的一个限流模块,我们对 Resilience4j 的 CircuitBreaker、Retry 已经有了一定的了解,现在来学习 RateLimiter 限流器;

引入依赖;

<dependency><groupId>io.github.resilience4j</groupId><artifactId>resilience4j-ratelimiter</artifactId><!--jdk17对应的版本--><version>2.2.0</version>
</dependency>

二. 配置项

和 Retry 类似,RateLimiter 中也有一些配置项,对应 RateLimiterConfig 类的配置项;

RateLimiter 的配置项相比 CircuitBreaker、Retry 来说非常少,我们看下它的几个配置项;

  • limitRefreshPeriod:刷新限流的时间;
  • limitForPeriod:在刷新周期内的最大允许请求数,也就是最大 permission 数;
  • timeoutDuration:获取 permission 的最大等待时间,超过此时间的话则认为无法获取到 permission,需要进行限流;

三. 简单使用

我们模拟在一个主线程中循序执行逻辑,看是否触发限流,以及触发几次限流;

public class TestRateLimiter01 {public static void main(String[] args) {// 创建一个限流配置RateLimiterConfig config = RateLimiterConfig.custom().limitRefreshPeriod(Duration.ofSeconds(1))   // 每秒刷新限流.limitForPeriod(10)                          // 每秒允许的最大请求数.timeoutDuration(Duration.ofMillis(200))     // 获取 permission 的最大等待时间,200ms.build();RateLimiterRegistry registry = RateLimiterRegistry.custom().withRateLimiterConfig(config).build();RateLimiter rateLimiter = registry.rateLimiter("myRateLimiter");for (int i = 0; i < 23; i++) {try {rateLimiter.executeRunnable(() -> System.out.println("--" + System.currentTimeMillis()));} catch (RequestNotPermitted ex) {System.out.println("发生了限流" + System.currentTimeMillis());}}}
}

打印如下:

--1723888206695
--1723888206699
--1723888206699
--1723888206699
--1723888206699
--1723888206699
--1723888206699
--1723888206699
--1723888206699
--1723888206699
发生了限流1723888206903
发生了限流1723888207104
发生了限流1723888207309
发生了限流1723888207512
--1723888207698
--1723888207698
--1723888207698
--1723888207698
--1723888207698
--1723888207698
--1723888207698
--1723888207698
--1723888207698

可以看到出现了 4 次限流;

分析:

  1. 由于获取 permission 的最大等待时间是 200ms,permission 的刷新周期是 1 s,也就是 1000 ms;且我们只有一个主线程;前 10 次顺利执行后,所剩的 permission 为 0;
  2. 第 11 次请求,到下一个周期大概还有 800 ms,大于我们获取 permission 的最大等待时间 200 ms,此时获取不到 permission,阻塞等待 200 ms,并限流;以此类推;
  3. 第 15 次请求,到下一个周期大概还有 180 ms,小于我们获取 permission 的最大等待时间 200 ms,此时能够获取到 permission,需要阻塞等待 180 ms,等待下一个周期的到来;
  4. 第 16 - 23 次请求,正常调用;

四. 限流算法

我们先看官网的这张图;

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

Resilience4j 总共有两种实现:

  • 基于 Java 信号量(Semaphore-Based Rate Limiter)
  • 基于原子计数器(Atomic Rate Limiter)

原子计数器(Atomic Rate Limiter)是默认的实现,我们看 AtomicRateLimiter,有时间的话再了解基于信号量的算法;

上图就是 AtomicRateLimiter 的实现示意图,它通过 AtomicReference 管理其状态。 其中,AtomicRateLimiter.State 是不可变的,并且具有以下字段:

  • activeCycle:上一次调用使用的周期号;
  • activePermissions:上次调用后的可用权限数;如果可以保留某些权限,则可以为负;
  • nanosToWait:等待上一次呼叫的等待许可的纳秒数;

主要逻辑是:

  1. 将时间分成相等的部分,称为循环;在任何时候,我们都可以通过计算 currentTime / cyclePeriod 来确定当前周期;
  2. 如果我们知道限制器最后一次使用的当前周期数和周期,那么我们实际上可以计算出应该在限制器中出现多少个权限;
  3. 经过此计算后,如果可用权限还不够,我们可以通过减少当前权限并计算我们等待它出现的时间来判断执行权限保留;
  4. 经过所有计算后,我们可以产生一个新的限制器状态并将其存储在 AtomicReference 中;

五. 分析

1. executeRunnable()

我们直接从 RateLimiter.executeRunnable() 入手;

// ------------------------------------- RateLimiter ------------------------------------
default void executeRunnable(Runnable runnable) {// permits 为 1,即每次请求都获取一个 permitexecuteRunnable(1, runnable);
}// ------------------------------------- RateLimiter ------------------------------------
default void executeRunnable(int permits, Runnable runnable) {decorateRunnable(this, permits, runnable).run();
}// ------------------------------------- RateLimiter ------------------------------------
static Runnable decorateRunnable(RateLimiter rateLimiter, int permits, Runnable runnable) {return decorateCheckedRunnable(rateLimiter, permits, runnable::run).unchecked();
}// ------------------------------------- RateLimiter ------------------------------------
static CheckedRunnable decorateCheckedRunnable(RateLimiter rateLimiter, int permits,CheckedRunnable runnable) {return () -> {// 1. 等待获取 permissionwaitForPermission(rateLimiter, permits);try {// 2. 执行 runnablerunnable.run();// rateLimiter.onSuccess() 和 onError() 是统计用的,可以先不看rateLimiter.onSuccess();} catch (Exception exception) {rateLimiter.onError(exception);throw exception;}};
}

先等待获取 permission,只有获取到 permission 的情况下才能执行 runnable;waitForPermission() 是核心方法;

2. waitForPermission()

// ------------------------------------- RateLimiter ------------------------------------
static void waitForPermission(final RateLimiter rateLimiter, int permits) {// 1. 调用 rateLimiter.acquirePermission(permits) 来获取 permits 数量的 permission// 默认使用的 RateLimiter 是 AtomicRateLimiter,我们主要分析 AtomicRateLimiterboolean permission = rateLimiter.acquirePermission(permits);if (Thread.currentThread().isInterrupted()) {throw new AcquirePermissionCancelledException();}// 2. 如果获取失败,此时需要限流,抛出 RequestNotPermitted 异常if (!permission) {throw RequestNotPermitted.createRequestNotPermitted(rateLimiter);}
}

3. acquirePermission()

获取 permission 调用的是 RateLimiter.acquirePermission(int permits),我们主要看 AtomicRateLimiter(令牌桶限流);

// ------------------------------------- AtomicRateLimiter ------------------------------------
public boolean acquirePermission(final int permits) {// 1. timeoutInNacnos 为获取 permission 的最大等待时间long timeoutInNanos = state.get().config.getTimeoutDuration().toNanos();// 2. 获取下一个状态State modifiedState = updateStateWithBackOff(permits, timeoutInNanos);// 3. 看是否能获取到 permission,获取到返回 true,获取不到返回 falseboolean result = waitForPermissionIfNecessary(timeoutInNanos, modifiedState.nanosToWait);// 4. 发布事件publishRateLimiterAcquisitionEvent(result, permits);// 返回获取结果return result;
}

我们主要看第 2 步和第 3 步;

3.1 updateStateWithBackOff()

updateStateWithBackOff() 主要用于更新 State,通过 CAS 的方式更新 State;

// ------------------------------------- AtomicRateLimiter ------------------------------------
private AtomicRateLimiter.State updateStateWithBackOff(long timeoutInNanos) {AtomicRateLimiter.State prev;AtomicRateLimiter.State next;do {prev = (AtomicRateLimiter.State)this.state.get();// 执行 calculateNextState()next = this.calculateNextState(timeoutInNanos, prev);} while(!this.compareAndSet(prev, next));return next;
}

calculateNextState() 比较复杂,逻辑如下:

// ------------------------------------- AtomicRateLimiter ------------------------------------
private AtomicRateLimiter.State calculateNextState(long timeoutInNanos, AtomicRateLimiter.State activeState) {// 每个时间段对应纳秒数,由配置文件中的 limitRefreshPeriodInMillis 计算而来long cyclePeriodInNanos = activeState.config.getLimitRefreshPeriodInNanos();//每个时间段内可执行次数,对应配置文件中的limitForPeriodint permissionsPerCycle = activeState.config.getLimitForPeriod();// 计算从本类初始化到现在的纳秒数long currentNanos = this.currentNanoTime();// 计算当前 cycle 数 long currentCycle = currentNanos / cyclePeriodInNanos;long nextCycle = activeState.activeCycle;int nextPermissions = activeState.activePermissions;// 1. 如果已经进入后续的 cycle,重置 nextCycle 和 nextPermissions 值// nextPermissions 需要通过计算得到// 这是因为 activeState.activePermissions 会有赊账的情况,可能会存在负值// 所以 nextPermissions = Long.min(nextPermissions + nextState, permissionsPerCycle)long nextNanosToWait;if(nextCycle != currentCycle) {nextNanosToWait = currentCycle - nextCycle;long nextState = nextNanosToWait * permissionsPerCycle;nextCycle = currentCycle;nextPermissions = Long.min(nextPermissions + nextState, permissionsPerCycle);}// 2. 计算所需等待时间nextNanosToWait = this.nanosToWaitForPermission(cyclePeriodInNanos, permissionsPerCycle, nextPermissions, currentNanos, currentCycle);// 3. 需要根据 nextNanosToWait 和 timeoutInNanos 做对比// 所需时间和超时时间做对比,判断能否在能及时执行完AtomicRateLimiter.State nextState1 = this.reservePermissions(activeState.config, timeoutInNanos, nextCycle, nextPermissions, nextNanosToWait);return nextState1;
}
3.1.1 nanosToWaitForPermission()

我们看下 nanosToWaitForPermission() 的实现,逻辑为判断是否还有可用执行次数,如果还有次数则直接返回 0,表示不需要等待时间;

否则计算总共需要等待的时间,如果所需的 permits 过大,可能会导致需要等待很多个 cycle;对于我们正常使用来说,permits 一般都为 1,这里一般最多等待 nanosToNextCycle,即到下一个时间周期的剩余时间;

// ------------------------------------- AtomicRateLimiter ------------------------------------
private long nanosToWaitForPermission(final int permits, final long cyclePeriodInNanos,final int permissionsPerCycle,final int availablePermissions, final long currentNanos, final long currentCycle) {if (availablePermissions >= permits) {return 0L;}long nextCycleTimeInNanos = (currentCycle + 1) * cyclePeriodInNanos;long nanosToNextCycle = nextCycleTimeInNanos - currentNanos;int permissionsAtTheStartOfNextCycle = availablePermissions + permissionsPerCycle;int fullCyclesToWait = divCeil(-(permissionsAtTheStartOfNextCycle - permits),permissionsPerCycle);// 一般等待时间都为 nanosToNextCyclereturn (fullCyclesToWait * cyclePeriodInNanos) + nanosToNextCycle;
}

下述为了解内容;

1、如果 permits 过大,示例如下,需要等待一个周期 + nanosToNextCycle;

availablePermissions = 2
permits = 20
permissionsAtTheStartOfNextCycle = 2+10 = 12
fullCyclesToWait = divCeil (-(12-20), 10) = divCeil(8, 10) = 1

2、如果我们设置的 timeoutInNanos 过大,比如为 6 秒,可能会出现赊账严重,示例如下,需要等待两个周期 + nanosToNextCycle;所以我们尽量不要设置 timeoutInNanos 过大;

availablePermissions = -22
permits = 1
permissionsAtTheStartOfNextCycle = -22+10 = -12
fullCyclesToWait = divCeil (-(-12-1), 10) = divCeil(13, 10) = 2
3.1.2 reservePermissions()

我们再来看下 reservePermissions() 的实现;

根据 nextNanosToWait 和 timeoutInNanos 做对比,将所需时间和超时时间做对比,判断能否在能及时执行完;

  • timeoutInNanos >= nanosToWait:能及时执行完,可用次数 permission-1,同时更新 cycle、nanosToWait;返回新的 State 对象;
  • timeoutInNanos < nanosToWait:不能及时执行完,permission 不变,同时更新 cycle、nanosToWait;返回新的 State 对象;
// ------------------------------------- AtomicRateLimiter ------------------------------------
private State reservePermissions(final RateLimiterConfig config, final int permits,final long timeoutInNanos,final long cycle, final int permissions, final long nanosToWait) {boolean canAcquireInTime = timeoutInNanos >= nanosToWait;int permissionsWithReservation = permissions;if (canAcquireInTime) {permissionsWithReservation -= permits;}return new State(config, cycle, permissionsWithReservation, nanosToWait);
}

3.2 waitForPermissionIfNecessary()

// ------------------------------------- AtomicRateLimiter ------------------------------------
private boolean waitForPermissionIfNecessary(final long timeoutInNanos,final long nanosToWait) {boolean canAcquireImmediately = nanosToWait <= 0;boolean canAcquireInTime = timeoutInNanos >= nanosToWait;// 1. nanosToWait == 0 的情况,表示立即获取到了 permission,返回 trueif (canAcquireImmediately) {return true;}// 2. timeoutInNanos >= nanosToWait,表示需要等待 nacosToWait 到下一个时间周期// 调用线程会在此处阻塞等待 nanosToWait 时间,等待完成后返回 trueif (canAcquireInTime) {return waitForPermission(nanosToWait);}// 3. timeoutInNanos < nanosToWait,超过我们指定的获取 permission 的最大等待时间// 调用线程会在此处阻塞等待 timeoutInNanos 时间,等待完成后返回 false,表示获取失败,需要限流waitForPermission(timeoutInNanos);return false;
}
http://www.lryc.cn/news/427044.html

相关文章:

  • Nginx--地址重写Rewrite
  • webflux源码解析(1)-主流程
  • ipad作为扩展屏的最简单方式
  • 【卡码网Python基础课 17.判断集合成员】
  • 生物研究新范式!AI语言模型在生物研究中的应用
  • python语言day08 属性装饰器和property函数 异常关键字 约束
  • day01JS-数据类型-01
  • MATLAB 手动实现一种高度覆盖值提取建筑物点云的方法(74)
  • git的下载与安装(Windows)
  • 腾讯云AI代码助手 —— 编程新体验,智能编码新纪元
  • 使用 ESP32 和 TFT 屏幕显示实时天气信息 —— 基于 OpenWeatherMap API
  • 高阶数据结构——B树
  • Vue2中watch与Vue3中watch对比和踩坑
  • 在Java程序中执行Linux命令
  • 微信小程序在不同移动设备上的差异导致原因
  • 快速体验fastllm安装部署并支持AMD ROCm推理加速
  • 报错:java: javacTask: 源发行版 8 需要目标发行版 1.8
  • 【数据结构篇】~单链表(附源码)
  • 旋转图像(LeetCode)
  • 入门 - vue中v-model的实现原理和完整用法详解
  • 【区块链+金融服务】港融区域股权服务平台 | FISCO BCOS应用案例
  • Nginx反向代理和前后端分离项目打包部署
  • Spring 中ApplicationContext
  • python之时间 datetime、date、time、timedelta、dateutil
  • 【机器学习第11章——特征选择与稀疏学习】
  • LeetCode-day43-3137. K 周期字符串需要的最少操作次数
  • 基于springboot的智能家居系统
  • 【从问题中去学习k8s】k8s中的常见面试题(夯实理论基础)(七)
  • C:每日一练:单身狗(2.0版本)
  • 打破接口壁垒:适配器模式让系统无缝对接