当前位置: 首页 > news >正文

k8s API限流——server级别整体限流和客户端限流

1. 背景

为了防止突发流量影响apiserver可用性,k8s支持多种限流配置,包括:

  • MaxInFlightLimit,server级别整体限流
  • Client限流
  • EventRateLimit, 限制event
  • APF,更细力度的限制配置

1.1 MaxInFlightLimit限流

  • apiserver默认可设置最大并发量(集群级别,区分只读与修改操作)
  • 通过参数–max-requests-inflight代表只读请求
  • –max-mutating-requests-inflight代表修改请求
  • 可以简单实现限流。

1.1.1 源码解读

  • 入口 GenericAPIServer.New中的添加hook

     	// FlowControl为nil ,代表未启用 APF,API 服务器中的整体并发量将受到 kube-apiserver 的参数 --max-requests-inflight 和 --max-mutating-requests-inflight 的限制。if c.FlowControl != nil {const priorityAndFairnessFilterHookName = "priority-and-fairness-filter"if !s.isPostStartHookRegistered(priorityAndFairnessFilterHookName) {err := s.AddPostStartHook(priorityAndFairnessFilterHookName, func(context PostStartHookContext) error {genericfilters.StartPriorityAndFairnessWatermarkMaintenance(context.StopCh)return nil})if err != nil {return nil, err}}} else {const maxInFlightFilterHookName = "max-in-flight-filter"if !s.isPostStartHookRegistered(maxInFlightFilterHookName) {err := s.AddPostStartHook(maxInFlightFilterHookName, func(context PostStartHookContext) error {genericfilters.StartMaxInFlightWatermarkMaintenance(context.StopCh)return nil})if err != nil {return nil, err}}}// StartMaxInFlightWatermarkMaintenance starts the goroutines to observe and maintain watermarks for max-in-flight
    // requests.
    func StartMaxInFlightWatermarkMaintenance(stopCh <-chan struct{}) {startWatermarkMaintenance(watermark, stopCh)
    }// startWatermarkMaintenance starts the goroutines to observe and maintain the specified watermark.
    func startWatermarkMaintenance(watermark *requestWatermark, stopCh <-chan struct{}) {// 定期更新inflight使用指标go wait.Until(func() {watermark.lock.Lock()readOnlyWatermark := watermark.readOnlyWatermarkmutatingWatermark := watermark.mutatingWatermarkwatermark.readOnlyWatermark = 0watermark.mutatingWatermark = 0watermark.lock.Unlock()metrics.UpdateInflightRequestMetrics(watermark.phase, readOnlyWatermark, mutatingWatermark)}, inflightUsageMetricUpdatePeriod, stopCh)// 定期观察watermarks。这样做是为了确保他们不会落后太多。当他们//落后太多时,在响应接收到的下一个请求时会有很长的延迟,而观察者//会赶上来。go wait.Until(func() {watermark.readOnlyObserver.Add(0)watermark.mutatingObserver.Add(0)}, observationMaintenancePeriod, stopCh)
    }
  • WithMaxInFlightLimit代表限流处理函数

调用入口: staging\src\k8s.io\apiserver\pkg\server\config.go

DefaultBuildHandlerChain中,判断FlowControl为nil就开启WithMaxInFlightLimit,

if c.FlowControl != nil {requestWorkEstimator := flowcontrolrequest.NewWorkEstimator(c.StorageObjectCountTracker.Get)handler = filterlatency.TrackCompleted(handler)handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, requestWorkEstimator)handler = filterlatency.TrackStarted(handler, "priorityandfairness")} else {handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc)}func WithMaxInFlightLimit(handler http.Handler,nonMutatingLimit int,mutatingLimit int,longRunningRequestCheck apirequest.LongRunningRequestCheck,
) http.Handler {// 如果limit num为0就不开启限流了if nonMutatingLimit == 0 && mutatingLimit == 0 {return handler}var nonMutatingChan chan boolvar mutatingChan chan bool// 构造限流的chan,类型为长度=limit的 bool chanif nonMutatingLimit != 0 {nonMutatingChan = make(chan bool, nonMutatingLimit)klog.V(2).InfoS("Initialized nonMutatingChan", "len", nonMutatingLimit)} else {klog.V(2).InfoS("Running with nil nonMutatingChan")}if mutatingLimit != 0 {mutatingChan = make(chan bool, mutatingLimit)klog.V(2).InfoS("Initialized mutatingChan", "len", mutatingLimit)} else {klog.V(2).InfoS("Running with nil mutatingChan")}initMaxInFlight(nonMutatingLimit, mutatingLimit)// 发起请求return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {ctx := r.Context()requestInfo, ok := apirequest.RequestInfoFrom(ctx)if !ok {handleError(w, r, fmt.Errorf("no RequestInfo found in context, handler chain must be wrong"))return}// 检查是否是长时间运行的请求if longRunningRequestCheck != nil && longRunningRequestCheck(r, requestInfo) {handler.ServeHTTP(w, r)return}。。。。。。。。
}// LongRunningRequestCheck is a predicate which is true for long-running http requests.
type LongRunningRequestCheck func(r *http.Request, requestInfo *RequestInfo) bool

使用BasicLongRunningRequestCheck检查是否是watch或者pprof debug等长时间运行的请求,因为这些请求不受限制,位置

func BasicLongRunningRequestCheck(longRunningVerbs, longRunningSubresources sets.String) apirequest.LongRunningRequestCheck {return func(r *http.Request, requestInfo *apirequest.RequestInfo) bool {if longRunningVerbs.Has(requestInfo.Verb) {return true}if requestInfo.IsResourceRequest && longRunningSubresources.Has(requestInfo.Subresource) {return true}if !requestInfo.IsResourceRequest && strings.HasPrefix(requestInfo.Path, "/debug/pprof/") {return true}return false}
}

检查是只读操作还是修改操作,决定使用哪个chan限制

		var c chan boolisMutatingRequest := !nonMutatingRequestVerbs.Has(requestInfo.Verb)if isMutatingRequest {c = mutatingChan} else {c = nonMutatingChan}

如果队列未满,有空位置,则更新排队数字

  • 使用select 向c中写入true,如果能写入到说明队列未满
  • 记录下对应的指标
		select {case c <- true:// We note the concurrency level both while the// request is being served and after it is done being// served, because both states contribute to the// sampled stats on concurrency.if isMutatingRequest {watermark.recordMutating(len(c))} else {watermark.recordReadOnly(len(c))}// default代表队列已满defer func() {<-cif isMutatingRequest {watermark.recordMutating(len(c))} else {watermark.recordReadOnly(len(c))}}()handler.ServeHTTP(w, r)

但是如果请求的group中含有 system:masters,则放行, 因为apiserver认为这个组是很重要的请求,不能被限流.

  • group=system:masters 对应的clusterRole 为cluster-admin, 队列已满,如果请求的group中没有 system:masters,则返回http 429错误,并且丢弃请求
				// at this point we're about to return a 429, BUT not all actors should be rate limited.  A system:master is so powerful// that they should always get an answer.  It's a super-admin or a loopback connection.if currUser, ok := apirequest.UserFrom(ctx); ok {for _, group := range currUser.GetGroups() {if group == user.SystemPrivilegedGroup {handler.ServeHTTP(w, r)return}}}
  • http 429 代表当前有太多请求了,请重试,并设置 response 的header Retry-After =1
// We need to split this data between buckets used for throttling.metrics.RecordDroppedRequest(r, requestInfo, metrics.APIServerComponent, isMutatingRequest)metrics.RecordRequestTermination(r, requestInfo, metrics.APIServerComponent, http.StatusTooManyRequests)tooManyRequests(r, w)func tooManyRequests(req *http.Request, w http.ResponseWriter) {// Return a 429 status indicating "Too Many Requests"w.Header().Set("Retry-After", retryAfter)http.Error(w, "Too many requests, please try again later.", http.StatusTooManyRequests)
}

1.2 Client限流

client-go默认的qps为5,但是只支持客户端限流,只能由各个发起端限制

  • 集群管理员无法控制用户行为。
http://www.lryc.cn/news/45236.html

相关文章:

  • 在华为做了三年软件测试被裁了,我该怎么办
  • Spring cloud 限流的多种方式
  • Linux命令·top
  • springmvc之系列文章
  • Matlab实现深度学习(附上完整仿真源码)
  • 我的谷歌书签
  • day3 数据库技术考点汇总
  • 学剪辑难吗 如何使用会声会影2023做剪辑视频
  • django学习日记
  • 在线教学视频课程如何防止学员挂机?
  • 【Redis】安装配置
  • ChatGPT批量生成文章-ChatGPT文章生成器
  • Linux命令 ——sed
  • C++常用字符串string方法
  • XML树结构和语法
  • 【Qt】Qt单元测试详解(四):Google Test 断言
  • 句柄和指针的区别
  • Linux 网络编程学习笔记——十四、多线程编程
  • JS 获取时区
  • 【0183】PG内核客户端认证之将读取的token创建HbaToken(3 - 1)
  • 别把 OpenAI 太当回事,它远未达到替换前端的地步
  • 前端基础HTML、CSS--8(CSS-5)
  • 基于ASP网络办公OA系统的设计与实现
  • C语言计算机二级/C语言期末考试 刷题(五)
  • 2023-04-03 grafana-源码编译启动及添加prometheus数据源
  • 微软New Bing(GPT-4)写的Delphi诗歌
  • 【进程地址空间】
  • 递归dfs入门
  • 华为OD机试用java实现 -【吃火锅】
  • AI创作优美文章的秘密大揭秘!