当前位置: 首页 > news >正文

[k8s源码]6.reflector

Reflector 和 Informer 是 Kubernetes 客户端库中两个密切相关但职责不同的组件。Reflector 是一个较低级别的组件,主要负责与 Kubernetes API 服务器进行交互,执行资源的初始列表操作和持续的监视操作,将获取到的数据放入队列中。而 Informer 是一个更高级别的抽象,它内部使用了 Reflector,但提供了更全面的功能。Informer 不仅负责数据同步,还维护了资源对象的本地缓存,并提供了事件处理机制,允许开发者注册自定义的事件处理函数。

Informer 实际上通过一个称为 Controller 的内部组件来管理 Reflector。在这个架构中,Reflector 负责从 Kubernetes API 服务器获取数据并将其放入一个 DeltaFIFO queue。Controller 则从这个 queue 中取出数据,更新 Informer 的 LocalStore(这是一个持久化的缓存),并触发相应的事件处理器。这种设计允许 Informer 提供一个高级抽象,隐藏了底层的复杂性。虽然 Informer 不直接调用 Reflector 的方法,但它们通过 Controller 紧密集成。这种架构确保了各个组件职责单一:Reflector 专注于 API 交互,DeltaFIFO 管理变更队列,Controller 协调数据流,而 Informer 则提供高级 API 和缓存管理。这种设计使得开发者可以方便地使用 Informer,而无需直接处理 Reflector 或了解内部 queue 和 store 的细节。

以kubernetes源码中的reflector的一个测试为例:
执行这个test-reflector.go文件,这种test文件可以封装一些测试的代码,更好的帮助我们理解reflector的工作原理。


func TestReflectorListAndWatch(t *testing.T) {createdFakes := make(chan *watch.FakeWatcher)// The ListFunc says that it's at revision 1. Therefore, we expect our WatchFunc// to get called at the beginning of the watch with 1, and again with 3 when we// inject an error.expectedRVs := []string{"1", "3"}lw := &testLW{WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {rv := options.ResourceVersionfw := watch.NewFake()if e, a := expectedRVs[0], rv; e != a {t.Errorf("Expected rv %v, but got %v", e, a)}expectedRVs = expectedRVs[1:]// channel is not buffered because the for loop below needs to block. But// we don't want to block here, so report the new fake via a go routine.go func() { createdFakes <- fw }()return fw, nil},ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil},}s := NewFIFO(MetaNamespaceKeyFunc)r := NewReflector(lw, &v1.Pod{}, s, 0)go r.ListAndWatch(wait.NeverStop)ids := []string{"foo", "bar", "baz", "qux", "zoo"}var fw *watch.FakeWatcherfor i, id := range ids {if fw == nil {fw = <-createdFakes}sendingRV := strconv.FormatUint(uint64(i+2), 10)fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: id, ResourceVersion: sendingRV}})if sendingRV == "3" {// Inject a failure.fw.Stop()fw = nil}}// Verify we received the right ids with the right resource versions.for i, id := range ids {pod := Pop(s).(*v1.Pod)if e, a := id, pod.Name; e != a {t.Errorf("%v: Expected %v, got %v", i, e, a)}if e, a := strconv.FormatUint(uint64(i+2), 10), pod.ResourceVersion; e != a {t.Errorf("%v: Expected %v, got %v", i, e, a)}}if len(expectedRVs) != 0 {t.Error("called watchStarter an unexpected number of times")}
}

第一部分初始化一个通道createdFakes,初始化一个watcher,有两个方法,为watch和list。watch方法会创建watcher,并传递到createdFakes通道里面。这里的watchFunc是一个闭包:

闭包(Closure)是指一个函数可以捕获并“记住”其作用域外部的变量,即使这个变量在闭包函数的作用域之外。这种特性使得闭包能够访问和操作其外部函数中的变量。捕获外部变量:闭包可以捕获外部函数的局部变量,并且在闭包函数内使用这些变量,即使外部函数已经执行完毕。持久性:闭包会“记住”它捕获的变量,即使外部函数已经返回。状态共享:通过闭包,可以在不同的函数调用之间共享状态。

这里的fw是闭包里面创建的watcher,并不和外面的fw冲突,如果不理解,这里的fw也可以改为别的名字。

fw := watch.NewFake()
go func() { createdFakes <- fw }()

 然后初始化reflector

s := NewFIFO(MetaNamespaceKeyFunc)
r := NewReflector(lw, &v1.Pod{}, s, 0)
go r.ListAndWatch(wait.NeverStop)

然后下面是测试部分,首先为fw赋值为watcher,一开始为nil,而在上面r=NewReflector初始化的时候,就已经自动调用了watchFunc,创建了一个watcher。随后这个watcher被拿出来给了fw。sendingRV随机生成一个资源的版本,随着从ids拿出来的值,作为pod的属性用来创建pod,在测试中,fw.Add(...) 模拟了 Kubernetes API 服务器添加新 Pod 的行为。在实际环境中,当新 Pod 被创建时,Watch 操作会接收到这个事件。测试使用 fw.Add(...) 来模拟这个过程,向 FakeWatcher 发送一个新 Pod 被添加的事件。

 ids := []string{"foo", "bar", "baz", "qux", "zoo"}
    var fw *watch.FakeWatcher
    for i, id := range ids {
        if fw == nil {
            fw = <-createdFakes
        }
        sendingRV := strconv.FormatUint(uint64(i+2), 10)
        fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: id, ResourceVersion: sendingRV}})
        if sendingRV == "3" {
            // Inject a failure.
            fw.Stop()
            fw = nil
        }
    }

如果sendingRV=3,那么就会将fw停止并设为nil。这模拟了如果resourceVersion错误会发生什么,但是reflector已经设为neverStop,那么此时ListAndWatch会重新调用watchFunc,然后创建新的watcher,放入createdFakes中,从而fw可以重新拿取。 

http://www.lryc.cn/news/405064.html

相关文章:

  • 前台文本直接取数据库值doFieldSQL插入SQL
  • 【06】LLaMA-Factory微调大模型——微调模型评估
  • 数学建模学习(1)遗传算法
  • NumPy冷知识66个
  • Wi-SUN无线通信技术 — 大规模分散式物联网应用首选
  • 在 Ubuntu Server 22.04 上安装 Docker 的详细步骤
  • 前端使用 Konva 实现可视化设计器(18)- 素材嵌套 - 加载阶段
  • vue3 -layui项目-左侧导航菜单栏
  • Spring AOP(1)
  • 第1关 -- Linux 基础知识
  • tensorflow keras Model.fit returning: ValueError: Unrecognized data type
  • 虚拟机固定配置IP
  • 【Pytorch实用教程】pytorch中random_split用法的详细介绍
  • 第二讲:NJ网络配置
  • pytorch中常见的模型3种组织方式 nn.Sequential(OrderedDict)
  • 达梦数据库DM8-索引篇
  • 【中项】系统集成项目管理工程师-第4章 信息系统架构-4.5技术架构
  • 随机梯度下降 (Stochastic Gradient Descent, SGD)
  • TDengine 3.3.2.0 发布:新增 UDT 及 Oracle、SQL Server 数据接入
  • Ubuntu 24.04 LTS 无法打开Chrome浏览器
  • linux中RocketMQ安装(单机版)及springboot中的使用
  • 亚信安全终端一体化解决方案入选应用创新典型案例
  • Django视图与URLs路由详解
  • 怎么关闭 Windows 安全中心,手动关闭 Windows Defender 教程
  • 洛谷看不了别人主页怎么办
  • 邮件安全篇:企业电子邮件安全涉及哪些方面?
  • 软件测试09 自动化测试技术(Selenium)
  • 记录解决springboot项目上传图片到本地,在html里不能回显的问题
  • C++ 中 const 关键字
  • 客梯自动监测识别摄像机