当前位置: 首页 > news >正文

如何借助Kafka持久化存储K8S事件数据?

大家应该对 Kubernetes Events 并不陌生,特别是当你使用 kubectl describe 命令或 Event API 资源来了解集群中的故障时。
 

$ kubectl get events15m         Warning   FailedCreate                                                                                                      replicaset/ml-pipeline-visualizationserver-865c7865bc    Error creating: pods "ml-pipeline-visualizationserver-865c7865bc-" is forbidden: error looking up service account default/default-editor: serviceaccount "default-editor" not found

 

尽管这些信息十分有用,但它只是临时的,保留时间最长为30天。如果出于审计或是故障诊断等目的,你可能想要把这些信息保留得更久,比如保存在像 Kafka 这样更持久、高效的存储中。然后你可以借助其他工具(如 Argo Events)或自己的应用程序订阅 Kafka 主题来对某些事件做出响应。
 

构建K8s事件处理链路

我们将构建一整套 Kubernetes 事件处理链路,其主要构成为:

  • Eventrouter,开源的 Kubernetes event 处理器,它可以将所有集群事件整合汇总到某个 Kafka 主题中。
  • Strimzi Operator,在 Kubernetes 中轻松管理 Kafka broker。
  • 自定义 Go 二进制文件以将事件分发到相应的 Kafka 主题中。
     

为什么要把事件分发到不同的主题中?比方说,在集群的每个命名空间中存在与特定客户相关的 Kubernetes 资产,那么在使用这些资产之前你当然希望将相关事件隔离开。
 

本示例中所有的配置、源代码和详细设置指示都已经放在以下代码仓库中:
https://github.com/esys/kube-events-kafka
 

 

创建 Kafka broker 和主题

我选择使用 Strimzi(strimzi.io/) 将 Kafka 部署到 Kubernetes 中。简而言之,它是用于创建和更新 Kafka broker 和主题的。你可以在官方文档中找到如何安装该 Operator 的详细说明:
https://strimzi.io/docs/operators/latest/overview.html
 

首先,创建一个新的 Kafka 集群:

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:name: kube-events
spec:entityOperator:topicOperator: {}userOperator: {}kafka:config:default.replication.factor: 3log.message.format.version: "2.6"offsets.topic.replication.factor: 3transaction.state.log.min.isr: 2transaction.state.log.replication.factor: 3listeners:- name: plainport: 9092tls: falsetype: internal- name: tlsport: 9093tls: truetype: internalreplicas: 3storage:type: jbodvolumes:- deleteClaim: falseid: 0size: 10Gitype: persistent-claimversion: 2.6.0zookeeper:replicas: 3storage:deleteClaim: falsesize: 10Gitype: persistent-claim

 

然后创建 Kafka 主题来接收我们的事件:

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaTopic
metadata:name: cluster-events
spec:config:retention.ms: 7200000segment.bytes: 1073741824partitions: 1replicas: 1

 

设置 EventRouter

在本教程中使用 kubectl apply 命令即可,我们需要编辑 router 的配置,以指明我们的 Kafka 端点和要使用的主题:

apiVersion: v1
data:config.json: |-{"sink": "kafka","kafkaBrokers": "kube-events-kafka-bootstrap.kube-events.svc.cluster.local:9092","kafkaTopic": "cluster-events"}
kind: ConfigMap
metadata:name: eventrouter-cm

 

验证设置是否正常工作

我们的 cluster-events Kafka 的主题现在应该收到所有的事件。最简单的方法是在主题上运行一个 consumer 来检验是否如此。为了方便期间,我们使用我们的一个 Kafka broker pods,它已经有了所有必要的工具,你可以看到事件流:

kubectl -n kube-events exec kube-events-kafka-0 -- bin/kafka-console-consumer.sh \--bootstrap-server kube-events-kafka-bootstrap:9092 \--topic kube-events \--from-beginning
{"verb":"ADDED","event":{...}}
{"verb":"ADDED","event":{...}}
...

 

编写 Golang 消费者

现在我们想将我们的 Kubernetes 事件依据其所在的命名空间分发到多个主题中。我们将编写一个 Golang 消费者和生产者来实现这一逻辑:

  • 消费者部分在 cluster-events 主题上监听传入的集群事件
  • 生产者部分写入与事件的命名空间相匹配的 Kafka 主题中
     

如果为Kafka配置了适当的选项(默认情况),就不需要特地创建新的主题,因为 Kafka 会默认为你创建主题。这是 Kafka 客户端 API 的一个非常酷的功能。

p, err := kafka.NewProducer(cfg.Endpoint)
if err != nil {sugar.Fatal("cannot create producer")
}
defer p.Close()c, err := kafka.NewConsumer(cfg.Endpoint, cfg.Topic)
if err != nil {sugar.Fatal("cannot create consumer")
}
defer c.Close()run := true
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {sig := <-sigssugar.Infof("signal %s received, terminating", sig)run = false
}()var wg sync.WaitGroup
go func() {wg.Add(1)for run {data, err := c.Read()if err != nil {sugar.Errorf("read event error: %v", err)time.Sleep(5 * time.Second)continue}if data == nil {continue}msg, err := event.CreateDestinationMessage(data)if err != nil {sugar.Errorf("cannot create destination event: %v", err)}p.Write(msg.Topic, msg.Message)}sugar.Info("worker thread done")wg.Done()
}()wg.Wait()

 

完整代码在此处:
https://github.com/esys/kube-events-kafka/blob/master/events-fanout/cmd/main.go
 

当然还有更高性能的选择,这取决于预计的事件量和扇出(fanout)逻辑的复杂性。对于一个更强大的实现,使用 Spark Structured Streaming 的消费者将是一个很好的选择。
 

部署消费者

构建并将二进制文件推送到 Docker 镜像之后,我们将它封装为 Kubernetes deployment:

apiVersion: apps/v1
kind: Deployment
metadata:labels:app: events-fanoutname: events-fanout
spec:replicas: 1selector:matchLabels:app: events-fanouttemplate:metadata:labels:app: events-fanoutspec:containers:- image: emmsys/events-fanout:latestname: events-fanoutcommand: [ "./events-fanout"]args:- -logLevel=infoenv:- name: ENDPOINTvalue: kube-events-kafka-bootstrap:9092- name: TOPICvalue: cluster-events

 

检查目标主题是否创建

现在,新的主题已经创建完成:

kubectl -n kube-events get kafkatopics.kafka.strimzi.io -o namekafkatopic.kafka.strimzi.io/cluster-events
kafkatopic.kafka.strimzi.io/kube-system
kafkatopic.kafka.strimzi.io/default
kafkatopic.kafka.strimzi.io/kafka
kafkatopic.kafka.strimzi.io/kube-events

 

你会发现你的事件根据其命名空间整齐地存储在这些主题中。
 

总结

访问 Kubernetes 历史事件日志可以使你对 Kubernetes 系统的状态有了更好的了解,但这单靠 kubectl 比较难做到。更重要的是,它可以通过对事件做出反应来实现集群或应用运维自动化,并以此来构建可靠、反应灵敏的软件。
 

原文链接:
https://hackernoon.com/monitor-your-kubernetes-cluster-events-with-eventrouter-golang-and-kafka-wh2a35l0

http://www.lryc.cn/news/69516.html

相关文章:

  • 一种基于非均匀分簇和建立簇间路由的算法的无线传感器网络路由协议(Matlab代码实现)
  • usb摄像头驱动打印信息
  • 银行半结构化和无领导群面注意事项
  • 今天公司来了个拿 30K 出来的测试,算是见识到了基础的天花板
  • SSM整合(单元测试、结果封装、异常处理)
  • C++ list
  • 【JavaScript】ES6新特性(2)
  • CST-FSS/周期谐振单元的仿真
  • 重新理解RocketMQ Commit Log存储协议
  • ROS 开发环境搭建(虚拟机版本)(一)
  • vue3做项目是需要注意的事项
  • docker日志轮转
  • 论文阅读_音频压缩_Encodec
  • 第06章_多表查询
  • 自学黑客(网络安全)有哪些技巧——初学者篇
  • CMD与DOS脚本编程【第四章】
  • Liunx安装Docker
  • docker:容器的数据卷
  • 【TCP】对TCP三次握手的个人理解
  • squid的基本代理
  • 【从零开始写视觉SLAM】v0.1基于特征点的简单VO
  • CentOS-7 安装 MariaDB-10.8
  • Packet Tracer – 对 VLAN 实施进行故障排除 – 方案 1
  • 五、c++学习(加餐1:汇编基础学习)
  • iOS正确获取图片参数深入探究及CGImageRef的使用(附源码)
  • Typescript 5.0 发布:快速概览
  • 【图像处理 】卡尔曼滤波器原理
  • YOLOv5 实例分割入门
  • 数字城市发展下的技术趋势,你知道多少?
  • linux 串口改为固定