当前位置: 首页 > news >正文

LiteHub中间件之限流实现

限流实现

    • 相关原理
      • 计数器算法:
      • 滑动窗口算法:
      • 漏桶算法:
      • 令牌桶算法:
    • 限流实现
    • 限流测试

相关原理

为什么要限流?

在开发高并发系统时,有三把利器用来保护系统:缓存、降级和限流。
其中,限流在很多场景中用来限制并发和请求量,比如说秒杀抢购,保护自身系统和下游系统不被巨型流量冲垮等。

常见的限流方法包括计数器、滑动窗口、漏桶和令牌桶算法。

计数器算法:

在一段时间间隔内(时间窗/时间区间,处理请求的最大数量固定,超过部分不做处理。
简单粗暴,比如指定线程池大小,指定数据库连接池大小、nginx连接数等,这都属于计数器算法。

计数器算法是限流算法里最简单也是最容易实现的一种算法。
举个例子,比如我们规定对于接口A,我们1分钟的访问次数不能超过100个。

计数器限流的做法是:
我们可以初始化一个计数器counter,每当收到请求时counter加1。若counter值超过100且当前请求与首个请求的时间间隔小于1分钟,则判定为请求过多并拒绝访问;若时间间隔超过1分钟且counter仍在限流范围内,则重置counter

但是这个方法存在一个显著的问题,攻击者可以在0:59的时候一次性发100个请求;到1:00就会将计数器清零,然后在1:01的时候再发100个请求;对于0:00-1:00和1:00-2:00这两个一分钟的请求都是100个请求,看起来是满足系统的要求的;但是在0:59-1:01这不足1分钟的时间段内,却发起了200个请求,可能会引起系统奔溃。

滑动窗口算法:

滑动窗口(rolling window)是一种时间分段技术。在计数器算法中,如果限制1分钟内的访问次数,这个1分钟就是一个固定时间窗口。而滑动窗口则是将这个固定窗口进一步细分成多个更小的时间单元。
在这里插入图片描述

例如:将1分钟的固定窗口划分为6个10秒的小窗口。整个红色矩形框代表一个大时间窗口,窗口会持续滑动,每10秒向右移动一格。假设在第一分钟的第59秒收到100个请求(落在灰色格子),第二分钟的1:00又收到100个请求(落在橘黄色格子)。此时滑动窗口检测到完整1分钟(红色框)内的总请求量达到200次,超过100次的限流阈值,就能及时触发限流机制。
滑动窗口划分得越精细,限流统计的准确性就越高,但过于精细会增加系统负担。

漏桶算法:

水(请求)先进入到漏桶里,漏桶以一定的速度出水,当水流入速度过大会超过桶可接纳的容量时直接溢出。
漏桶算法可以粗略的认为就是注水漏水过程,往桶中以任意速率流入水,以一定速率流出水,当水超过桶容量(capacity)则丢弃,因为桶容量是不变的,保证了整体的速率。
在这里插入图片描述
总结:漏桶算法通过一个固定容量和固定漏水速率的水桶模型,强制将任意输入流量整形为恒定速率输出,并在流量超过容量时丢弃请求,以此实现速率限制、流量平滑和系统保护。

令牌桶算法:

令牌桶是一个存放固定容量令牌的桶,按照固定速率往桶里添加令牌,填满了就丢弃令牌,请求是否被处理要看桶中令牌是否足够,当令牌数减为零时则拒绝新的请求。在流量低峰的时候,令牌桶会出现堆积,因此当出现瞬时高峰的时候,有足够多的令牌可以获取,令牌桶允许一定程度突发流量,只要有令牌就可以处理,支持一次拿多个令牌。令牌桶中装的是令牌。

在这里插入图片描述

特点:与漏桶算法相比,令牌桶算法允许短时间内的请求量激增(获得令牌后即可访问接口,可能出现瞬间消耗所有累积令牌的情况),但不会像计数算法那样产生过高峰值(因为令牌是匀速生成的)。因此,令牌桶算法在处理突发流量时表现更优。
部分内容和图片来源:
常见限流算法:计数器、滑动窗口、漏桶、令牌桶
限流:计数器、漏桶、令牌桶 三大算法的原理与实战(史上最全)

限流实现

本项目我是基于令牌桶实现的访问限流,请看以下代码
LimitMiddleware.h头文件

class LimitMiddleware: public Middleware 
{
public:// 构造函数// rate:令牌生成速率(个/秒)// capacity:桶最大容量(最多存多少个令牌)LimitMiddleware(int rate, int capacity);// 在请求处理前调用,用于限流void before(HttpRequest& request) override;void after(HttpResponse& response) override {};double gettokens() const {return tokens_;}
private:// 补充令牌(根据时间推移)void refillTokens();private:int rate_;            // 令牌生成速率(个/秒)int capacity_;        // 桶容量(最大令牌数)double tokens_;       // 当前可用令牌数(允许小数,更精确)std::chrono::steady_clock::time_point lastRefillTime_; // 上一次补充时间std::mutex mutex_;    // 保护多线程访问
};

LimitMiddleware.cc文件

using namespace std::chrono;// 令牌桶限流中间件
// 构造函数:指定令牌产生速率 rate (个/秒) 和桶容量 capacity (最多可存储多少令牌)
LimitMiddleware::LimitMiddleware(int rate, int capacity): rate_(rate),capacity_(capacity),tokens_(capacity),  // 初始化时桶是满的,令牌数等于容量lastRefillTime_(steady_clock::now())  // 记录上次补充令牌的时间
{
}// 令牌补充逻辑
// 根据距离上次补充的时间,按速率补充新令牌
void LimitMiddleware::refillTokens()
{auto now = steady_clock::now();auto elapsedMs = duration_cast<milliseconds>(now - lastRefillTime_).count();if (elapsedMs > 0){   // 按照速率计算可以补充的令牌数double newTokens = (elapsedMs / 1000.0) * rate_;// 桶中的令牌数不能超过容量上限tokens_ = std::min((double)capacity_, tokens_ + newTokens);// 更新时间戳lastRefillTime_ = now;}
}// 请求前执行:判断是否有足够的令牌
//如果有足够的令牌,进行下一步操作
//如果没有,拒绝访问,返回状态码429
void LimitMiddleware::before(HttpRequest& request)
{   // 加锁保证多线程安全std::lock_guard<std::mutex> lock(mutex_);// 先补充令牌refillTokens();if (tokens_ >= 1.0){tokens_ -= 1.0;// 有足够令牌,消费 1 个,允许请求通过}else{   // 没有足够令牌,请求被拒绝,抛出 429 响应HttpResponse resp;resp.setStatusLine(request.getVersion(), http::HttpResponse::k429TooManyRequests, "Too Many Requests");resp.setCloseConnection(true);resp.setContentType("application/json");resp.setContentLength(0);resp.setBody("Rate limit exceeded. Please try again later.");throw resp;}
}

核心逻辑是:

  • 按固定速率补充令牌(refillTokens
  • 请求到来时消费令牌(before
  • 没有令牌可用时返回(限流了,返回状态码429 Too Many Requests

限流测试

WebApps/LiteHubServer/src/LiteHubServer.cpp中的initializeMiddleware函数中,定义了限流中间件

 limitMiddleware_ = std::make_shared<http::middleware::LimitMiddleware>(1,100); // 每秒最多100个请求httpServer_.addMiddleware(limitMiddleware_);

这里定义的是一秒不超过100个请求,如果通过手动点击,这个1秒内怎么也到不了100次请求;所以我通过python脚本代码模拟一次大量的访问,python代码如下,

import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
from collections import Counter# -------------------------------
# 压测参数配置
# -------------------------------
TOTAL_REQUESTS = 150      # 总请求数
MAX_WORKERS = 3           # 并发线程数
REQUEST_INTERVAL = 0.02   # 相邻请求的间隔(秒),避免瞬间爆发
TARGET_URL = "http://47.122.77.97/"  # 目标 URL# -------------------------------
# 单次请求任务
# index: 请求编号
# -------------------------------
def send_request(index):try:start_time = time.time()# 发起 GET 请求r = requests.get(TARGET_URL, timeout=3)elapsed = time.time() - start_time# 打印日志:时间戳 + 请求序号 + 响应码 + 耗时print(f"[{time.strftime('%H:%M:%S')}] 请求 {index + 1:02d} --> 状态码: {r.status_code} (耗时: {elapsed:.2f}s)")return r.status_codeexcept Exception as e:# 异常时打印错误信息print(f"[{time.strftime('%H:%M:%S')}] 请求 {index + 1:02d} --> 失败: {str(e)}")return str(e)# -------------------------------
# 主程序入口
# -------------------------------
def main():print(f"开始压测,总请求数:{TOTAL_REQUESTS},最大并发数:{MAX_WORKERS}")results = []# 记录整个压测开始时间total_start_time = time.time()# 创建线程池with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:futures = []# 提交任务for i in range(TOTAL_REQUESTS):futures.append(executor.submit(send_request, i))time.sleep(REQUEST_INTERVAL)  # 控制相邻请求的间隔# 等待所有任务完成for future in as_completed(futures):results.append(future.result())# 记录整个压测结束时间total_elapsed = time.time() - total_start_time# -------------------------------# 统计与输出结果# -------------------------------print("\n-----------------------------")print("请求状态统计结果:")counts = Counter(results)for key, count in counts.items():print(f"{key}: {count} 次")print(f"\n压测总耗时: {total_elapsed:.2f} 秒")print("-----------------------------")if __name__ == "__main__":main()

执行python代码的结果如下:

在这里插入图片描述

在这里插入图片描述
可用看到总共有104次请求成功;46次拒绝访问。证明了设计的令牌桶限流策略有效。
我们将REQUEST_INTERVAL 调整的大一些,避免瞬间的大量请求

REQUEST_INTERVAL = 0.1   

再执行一次:
在这里插入图片描述
拒绝访问的次数也少了一些,从(46—>>>34,即减少了12次)。

下图为wireshark抓包到的429状态码响应:
在这里插入图片描述

http://www.lryc.cn/news/579849.html

相关文章:

  • git教程-pycharm使用tag打标签
  • 【JavaEE】计算机工作原理
  • 【IM项目笔记】1、WebSocket协议和服务端推送Web方案
  • Angular v20版本正式发布
  • Unity 中相机大小与相机矩形大小的关系
  • Android 网络请求优化全面指南
  • rs-agent论文精读
  • 第十五节:第四部分:特殊文件:XML的生成、约束(了解即可)
  • 【Modbus学习笔记】stm32实现Modbus
  • Python 闭包(Closure)实战总结
  • 万勋科技「柔韧机器人玻璃幕墙清洗」全国巡展@上海!引领清洗无人机智能化升级
  • 读商战数据挖掘:你需要了解的数据科学与分析思维05拟合数据
  • Windows系统下WSL从C盘迁移方案
  • Vue-19-前端框架Vue之应用基础组件通信(二)
  • 算法学习笔记:6.深度优先搜索算法——从原理到实战,涵盖 LeetCode 与考研 408 例题
  • 【办公类-54-07】20250901 2025学年第一学期班级点名册模版(双休国定假涂成灰色、修改标题和页眉,批量导出PDF)
  • 使用alist+RaiDrive+webdav将百度夸克网盘变为本地电脑磁盘方法教程
  • 基于微信小程序的校园二手交易平台、微信小程序校园二手商城源代码+数据库+使用说明,layui+微信小程序+Spring Boot
  • 如何搭建 OLAP 系统?OLAP与数据仓库有什么关系?
  • 推荐算法系统系列>推荐数据仓库集市的ETL数据处理
  • BLDC电机-运动控制---stm32时钟树定时器SYSTICKRTC的学习
  • Django Channels WebSocket实时通信实战:从聊天功能到消息推送
  • 前端查询条件加密传输方案(SM2加解密)
  • 浏览器(Chrome /Edge)高效使用 - 内部命令/快捷键/启动参数
  • 服务器如何配置防火墙规则以阻止恶意流量和DDoS攻击?
  • mybatisPlus分页方言设置错误问题 mybatisPlus对于Oceanbase的Oracle租户分页识别错误
  • HarmonyOS免密认证方案 助力应用登录安全升级
  • 使用循环抵消算法求解最小费用流问题
  • Python 制作 pyd(Windows 平台的动态链接库)
  • 【行云流水ai笔记】粗粒度控制:推荐CTRL、GeDi 细粒度/多属性控制:推荐TOLE、GPT-4RL