当前位置: 首页 > news >正文

限流算法,基于go的gRPC 实现的

目录

一、单机限流

1、令牌桶算法

3、固定窗口限流算法

4、滑动窗口

二、集群限流

1、分布式固定窗口 (基于redis)

2、分布式滑动窗口


一、单机限流

1、令牌桶算法

令牌桶算法是当流量进入系统前需要获取令牌,没有令牌那么就要进行限流

这个算法是怎么实现的呢

  1. 定义一个后台协程按照一定的频率去产生token

  2. 后台协程产生的token 放到固定大小容器里面

  3. 有流量进入系统尝试拿到token,没有token 就需要限流了


type TokenBucketLimiter struct {token chan struct{}stop  chan struct{}
}
​
func NewTokenBucket(capactity int, timeInternal time.Duration) *TokenBucketLimiter {te := make(chan struct{}, capactity)stop := make(chan struct{})ticker := time.NewTicker(timeInternal)go func() {defer ticker.Stop()for {select {case <-ticker.C:select {case te <- struct{}{}:default:
​}case <-stop:return}}}()return &TokenBucketLimiter{token: te,stop:  stop,}
}
​
func (t *TokenBucketLimiter) BuildServerInterceptor() grpc.UnaryServerInterceptor {return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {select {case <-ctx.Done():err = ctx.Err()returncase <-t.token:return handler(ctx, req)case <-t.stop:err = errors.New("缺乏保护")return}}
}
​
func (t *TokenBucketLimiter) Stop() {close(t.stop)
}

3、固定窗口限流算法

什么是固定窗口限流算法

固定窗口限流算法(Fixed Window Rate Limiting Algorithm)是一种最简单的限流算法,其原理是在固定时间窗口(单位时间)内限制请求的数量。该算法将时间分成固定的窗口,并在每个窗口内限制请求的数量。具体来说,算法将请求按照时间顺序放入时间窗口中,并计算该时间窗口内的请求数量,如果请求数量超出了限制,则拒绝该请求。

优点:实现简单

缺点:对于瞬时流量没发处理,也就是临界问题,比如下图在20t前后,在16t以及26t有大量流量进来,在这10t中,已经超过了流量限制,没法限流

实现如下

type fixWindow1 struct {lastVistTime int64vistCount    int64interval     int64maxCount     int64
}
​
func NewfixWindow1(macCount int64) *fixWindow1 {t := &fixWindow1{maxCount: macCount,}return t
}
​
func (f *fixWindow1) FixWindow1() grpc.UnaryServerInterceptor {return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {current := time.Now().UnixNano()lasttime := atomic.LoadInt64(&f.lastVistTime)if lasttime+f.interval > current {if atomic.CompareAndSwapInt64(&f.lastVistTime, lasttime, current) {atomic.StoreInt64(&f.lastVistTime, current)atomic.StoreInt64(&f.maxCount, 0)}}count := atomic.AddInt64(&f.vistCount, 1)if count > f.maxCount {return gen.GetByIDResp{}, errors.New("触发限流")}return handler(ctx, req)}
}

4、滑动窗口

什么是滑动窗口算法:

滑动窗口限流算法是一种常用的限流算法,用于控制系统对外提供服务的速率,防止系统被过多的请求压垮。它将单位时间周期分为n个小周期,分别记录每个小周期内接口的访问次数,并且根据时间滑动删除过期的小周期。它可以解决固定窗口临界值的问题

type slideWindow struct {
timeWindow *list.Listinterval   int64maxCnt     intlock       sync.Mutex
}
​
func NewSlideWindow(interval time.Duration, maxCnt int) *slideWindow {t := &slideWindow{timeWindow: list.New(),interval:   interval.Nanoseconds(),maxCnt:     maxCnt,}return t
}
​
func (s *slideWindow) SlideWinowlimit() grpc.UnaryServerInterceptor {return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {s.lock.Lock()now := time.Now().UnixNano()// 快路径if s.timeWindow.Len() < s.maxCnt {resp, err = handler(ctx, req)s.timeWindow.PushBack(now)s.lock.Unlock()return}front := s.timeWindow.Front()for front != nil && front.Value.(int64)+s.interval < now {s.timeWindow.Remove(front)front = s.timeWindow.Front()}if s.timeWindow.Len() >= s.maxCnt {s.lock.Unlock()return &gen.GetByIdReq{}, errors.New("触发限流")}s.lock.Unlock()resp, err = handler(ctx, req)s.timeWindow.PushBack(now)return}
}

二、集群限流

下面是分布式限流,为啥是分布式限流,单机限流只能对单台服务器进行限流,没发对集权进行限流,需要用分布式限流来进行集权限流

1、分布式固定窗口 (基于redis)
type redisFix struct {
serName  stringinterVal intlimitCnt intredis    redis.Cmdable
}
​
//go:embed lua/fixwindow.lua
var lua_redis_fix string
​
func NewRedisFix(serName string, interval int, limitCnt int, redis redis.Cmdable) *redisFix {t := &redisFix{serName:  serName,interVal: interval,limitCnt: limitCnt,redis:    redis,}return t
}
​
func (r *redisFix) RedisFix() grpc.UnaryServerInterceptor {return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {res, err := r.limit(ctx)if err != nil {return &gen.GetByIDResp{}, err}if res {return &gen.GetByIdReq{}, errors.New("触发限流")}return handler(ctx, req)}
}
​
func (r *redisFix) limit(ctx context.Context) (res bool, err error) {keys := []string{r.serName}res, err = r.redis.Eval(ctx, lua_redis_fix, keys, r.interVal, r.limitCnt).Bool()return
}

lua

local key = KEYS[1]

local limitCnt = tonumber(ARGV[2])
local val = redis.call('get',key)
if val==false thenif limitCnt<1 thenreturn "true"elseredis.call('set',key,1,'PX',ARGV[1])return "false"end
elseif tonumber(val)<limitCnt thenredis.call('incr',key)return "false"
elsereturn "true"
end
2、分布式滑动窗口
//go:embed lua/slidewindow.lua

var slideWindLua string
​
type redisSlib struct {serverName stringinterVal   time.DurationmaxCnt     int64redis      redis.Cmdable
}
​
func NewRedisSlib(interval time.Duration, maxCnt int64, serverName string, clientCmd redis.Cmdable) *redisSlib {t := &redisSlib{serverName: serverName,interVal:   interval,maxCnt:     maxCnt,redis:      clientCmd,}return t
}
​
func (r *redisSlib) RedisSlibLimt() grpc.UnaryServerInterceptor {return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {limt, err := r.limt(ctx)if err != nil {return nil, err}if limt {return nil, errors.New("限流")}return handler(ctx, req)}
}
​
func (r *redisSlib) limt(ctx context.Context) (bool, error) {now := time.Now().UnixMilli()return r.redis.Eval(ctx, slideWindLua, []string{r.serverName}, r.interVal.Milliseconds(), r.maxCnt, now).Bool()
}

lua

local key = KEYS[1]
local window = tonumber(ARGV[1])
local maxCnt = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
​
--- 窗口的最小边界
local min = now-window
​
redis.call('ZREMRANGEBYSCORE',key,'-inf',min)
​
local cnt = redis.call('ZCOUNT',key,'-inf','+inf')
​
if cnt>=maxCnt thenreturn "true"
elseredis.call('ZADD',key,now,now)redis.call('PEXPIRE',key,window)return "false"
end

http://www.lryc.cn/news/254252.html

相关文章:

  • Shell中HTTP变量和文本处理
  • java学习part39map
  • 使用sqoop操作HDFS与MySQL之间的数据互传
  • Kafka使用指南
  • HarmonyOS4.0从零开始的开发教程03初识ArkTS开发语言(中)
  • 西工大计算机学院计算机系统基础实验一(函数编写1~10)
  • VMware 虚拟机 电脑重启后 NAT 模式连不上网络问题修复
  • 【桑基图】绘制桑基图
  • ACM32F403/F433 12 位多通道,支持 MPU 存储保护功能,应用于工业控制,智能家居等产品中
  • 7. 从零用Rust编写正反向代理, HTTP及TCP内网穿透原理及运行篇
  • UE4.27-UE5.1设置打包Android环境
  • MySQL授权密码
  • 0X05
  • Doris优化总结
  • 案例059:基于微信小程序的在线投稿系统
  • 利用STM32内置Bootloader实现USB DFU固件升级
  • Centos7如何安装MySQL
  • VR远程带看,助力线下门店线上化转型“自救”
  • 算法通关村第十七关-白银挑战贪心算法高频题目
  • 【数据结构】动态规划(Dynamic Programming)
  • Redis key过期删除机制实现分析
  • ElasticSearch 谈谈分词与倒排索引的原理
  • 【Java】Java8重要特性——Lambda函数式编程以及Stream流对集合数据的操作
  • 大话数据结构-查找-散列表查找(哈希表)
  • 持续集成交付CICD:Sonarqube自动更新项目质量配置
  • Linux设置Docker自动创建Nginx容器脚本
  • 技术博客:Vue中各种混淆用法汇总
  • 【python】Python生成GIF动图,多张图片转动态图,pillow
  • python/matlab图像去雾/去雨综述
  • Docker+jenkins+gitlab实现持续集成