当前位置: 首页 > news >正文

K8S部署ELK(二):部署Kafka消息队列

目录

1. Kafka 简介

1.1 Kafka 核心概念

(1)消息系统 vs. 流处理平台

(2)核心组件

1.2 Kafka 核心特性

(1)高吞吐 & 低延迟

(2)持久化存储

(3)分布式 & 高可用

(4)水平扩展

(5)流处理能力

1.3 Kafka 典型应用场景

1.4 Kafka 架构示例

数据流示例(订单处理)

1.5 Kafka vs 其他消息队列

2. kafka部署

2.1 创建Namespace

2.2 创建ConfigMap

2.3 创建Headless Service

2.4 创建Statefulset

2.5 部署所有资源

2.6 检查kafka Pod状态


1. Kafka 简介

Apache Kafka 是一个 分布式流处理平台,主要用于构建 高吞吐量、低延迟、可扩展 的实时数据管道和流式应用程序。它最初由 LinkedIn 开发,后成为 Apache 顶级开源项目,广泛应用于大数据、日志聚合、事件驱动架构等领域。


1.1 Kafka 核心概念

(1)消息系统 vs. 流处理平台

  • 传统消息队列(如 RabbitMQ):主要用于解耦生产者和消费者,保证消息可靠传递。

  • Kafka

    • 不仅是一个消息队列,还是一个 分布式流存储系统,支持持久化存储和流式计算。

    • 适用于 高吞吐、大规模数据流 场景(如日志、指标、事件数据)。

(2)核心组件

组件说明
Producer(生产者)向 Kafka 发送消息(如日志、交易数据)。
Consumer(消费者)从 Kafka 读取并处理消息。
Broker(代理)Kafka 服务器,负责存储和转发消息。
Topic(主题)消息的分类(类似数据库表),如 orderslogs
Partition(分区)每个 Topic 可分成多个 Partition,提高并行处理能力。
Offset(偏移量)每条消息在 Partition 中的唯一 ID(类似数据库主键)。
Consumer Group(消费者组)多个消费者共同消费一个 Topic,实现负载均衡。
ZooKeeper管理 Kafka 集群元数据(新版本 Kafka 已逐步移除依赖)。

1.2 Kafka 核心特性

(1)高吞吐 & 低延迟

  • 支持每秒百万级消息处理(取决于硬件配置)。

  • 采用 顺序 I/O(相比随机 I/O 更快)和 零拷贝 技术优化性能。

(2)持久化存储

  • 消息默认持久化到磁盘(可配置保留时间),支持 重放(replay) 数据。

  • 适用于 事件溯源(Event Sourcing)审计日志

(3)分布式 & 高可用

  • 支持 多副本(Replication),防止数据丢失。

  • 自动故障转移(Leader/Follower 机制)。

(4)水平扩展

  • 可动态增加 Broker 和 Partition,提升吞吐量。

(5)流处理能力

  • 配合 Kafka StreamsksqlDB 可实现实时流计算(如聚合、窗口计算)。


1.3 Kafka 典型应用场景

场景说明
日志聚合收集应用日志(替代 ELK 中的 Logstash)。
消息队列解耦微服务,如订单系统 → 库存系统。
实时数据处理结合 Flink/Spark Streaming 做实时分析。
事件驱动架构如用户行为追踪、IoT 设备数据采集。
Commit Log(提交日志)数据库变更捕获(CDC),如 Debezium + Kafka。

1.4 Kafka 架构示例

生产者(Producer) → Kafka Cluster(Broker1, Broker2...)↓
消费者(Consumer Group)→ 实时处理(Flink/Spark)↓存储(HDFS/DB)

数据流示例(订单处理)

  1. 订单服务(Producer)发送消息到 orders Topic。

  2. 库存服务(Consumer)读取 orders 消息,扣减库存。

  3. 分析服务(Consumer)统计实时销售额。


1.5 Kafka vs 其他消息队列

特性KafkaRabbitMQPulsar
吞吐量⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
延迟⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
持久化支持(磁盘)可选(内存/磁盘)支持
流处理原生支持(Kafka Streams)不支持支持(Pulsar Functions)
适用场景大数据、日志任务队列、RPC多租户、云原生

适用 Kafka 的场景

  • 需要高吞吐、持久化存储的实时数据流(如日志、事件)。

  • 流处理(如实时分析、监控)。

不适用 Kafka 的场景

  • 需要复杂路由(RabbitMQ 更合适)。

  • 低延迟任务队列(Redis Streams/RabbitMQ 更好)。

Kafka 已成为现代数据架构的核心组件,广泛应用于大数据、微服务、实时计算等领域。

2. kafka部署

2.1 创建Namespace

kubectl create namespace elk

2.2 创建ConfigMap

vim kafka-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:name: ldc-kafka-scriptsnamespace: elk
data:setup.sh: |-  #启动脚本#!/bin/bashexport KAFKA_CFG_NODE_ID=${MY_POD_NAME##*-} exec /opt/bitnami/scripts/kafka/entrypoint.sh /opt/bitnami/scripts/kafka/run.sh

2.3 创建Headless Service

vim kafka-headless.yaml
apiVersion: v1
kind: Service
metadata:name: kafka-headlessnamespace: elk
spec:clusterIP: Noneselector:app: kafkaports:- name: brokerport: 9092- name: controllerport: 9093

2.4 创建Statefulset

vim kafka-statefulset.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:name: kafkanamespace: elklabels:app: kafka
spec:selector:matchLabels:app: kafkaserviceName: kafka-headlesspodManagementPolicy: Parallelreplicas: 1  #根据资源情况设置实例数,推荐3个副本updateStrategy:type: RollingUpdatetemplate:metadata:labels:app: kafkaspec:affinity:nodeAffinity:  #这里做了节点亲和性调度到master节点requiredDuringSchedulingIgnoredDuringExecution:nodeSelectorTerms:- matchExpressions:- key: node-role.kubernetes.io/control-planeoperator: Exists#values:#- mastertolerations:- key: "node-role.kubernetes.io/control-plane"operator: "Exists"effect: "NoSchedule"containers:- name: kafkaimage: swr.cn-north-4.myhuaweicloud.com/ddn-k8s/docker.io/bitnami/kafka:3.4.0imagePullPolicy: "IfNotPresent"command:- /opt/leaderchain/setup.shenv:- name: BITNAMI_DEBUGvalue: "true" #详细日志# KRaft settings - name: MY_POD_NAME # 用于生成KAFKA_CFG_NODE_IDvalueFrom:fieldRef:fieldPath: metadata.name            - name: KAFKA_CFG_PROCESS_ROLESvalue: "controller,broker"- name: KAFKA_CFG_CONTROLLER_QUORUM_VOTERSvalue: "0@kafka-0.kafka-headless:9093"  #修改实例数时要更新- name: KAFKA_KRAFT_CLUSTER_IDvalue: "Jc7hwCMorEyPprSI1Iw4sW"  # Listeners            - name: KAFKA_CFG_LISTENERSvalue: "PLAINTEXT://:9092,CONTROLLER://:9093"- name: KAFKA_CFG_ADVERTISED_LISTENERSvalue: "PLAINTEXT://:9092"- name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAPvalue: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"- name: KAFKA_CFG_CONTROLLER_LISTENER_NAMESvalue: "CONTROLLER"- name: KAFKA_CFG_INTER_BROKER_LISTENER_NAMEvalue: "PLAINTEXT"- name: ALLOW_PLAINTEXT_LISTENERvalue: "yes"ports:- containerPort: 9092name: broker- containerPort: 9093name: controllerprotocol: TCP                     volumeMounts:- mountPath: /bitnami/kafkaname: kafka-data- mountPath: /opt/leaderchain/setup.shname: scriptssubPath: setup.shreadOnly: true      securityContext:fsGroup: 1001runAsUser: 1001volumes:    - configMap:defaultMode: 493name: ldc-kafka-scripts  #ConfigMap的名字name: scripts                   volumeClaimTemplates:- apiVersion: v1kind: PersistentVolumeClaimmetadata:name: kafka-dataspec:accessModes: [ "ReadWriteOnce" ] storageClassName: nfs-client  #存储类的名称resources:requests:storage: 1Gi

2.5 部署所有资源

[root@master1 Kafka]# ls
kafka-configmap.yaml  kafka-headless.yaml  kafka-statefulset.yaml
[root@master1 Kafka]# kubectl apply -f ./
configmap/ldc-kafka-scripts created
service/kafka-headless created
statefulset.apps/kafka created

2.6 检查kafka Pod状态

[root@master1 Kafka]# kubectl get pod -n elk 
NAME             READY   STATUS    RESTARTS   AGE
filebeat-6db9l   1/1     Running   0          62m
filebeat-qllxg   1/1     Running   0          62m
filebeat-r5hw7   1/1     Running   0          62m
kafka-0          1/1     Running   0          2m2s
http://www.lryc.cn/news/608057.html

相关文章:

  • 深入 Go 底层原理(六):垃圾回收(GC)
  • ubuntu22.04离线一键安装gpu版docker
  • 开源列式分布式数据库clickhouse
  • pyqt5显示任务栏菜单并隐藏主窗口,环境pyqt5+vscode
  • CS课程项目设计7:基于Canvas交互友好的五子棋游戏
  • 从AI智能体出发,重构数据中台:迈向Agentic时代的数据能力体系
  • Docker容器中文PDF生成解决方案
  • Oracle 11gR2 Clusterware应知应会
  • 分布式事务----spring操作多个数据库,事务以及事务回滚还有用吗
  • Oracle 11g RAC集群部署手册(二)
  • Token系列 - 再谈稳定币
  • mac 安装pytho3 和pipx
  • 讲一讲Spring核心三大组件IOC、Bean、AOP
  • 我的世界模组开发教程——物品item(1)
  • Vuex 4.0:Vue.js 应用的状态管理新篇章
  • 深度学习核心:神经网络-激活函数 - 原理、实现及在医学影像领域的应用
  • K8S部署ELK(一):部署Filebeat日志收集器
  • SCI 绘图实用 RGB 配色方案:多组比较
  • [Windows] 微软.Net运行库离线合集包 Microsoft .Net Packages AIO v13.05.25
  • Vue3+ts自定义指令
  • 从毫秒到真义:构建工业级RAG系统的向量检索优化指南
  • 入门MicroPython+ESP32:ESP32烧录MicroPython固件
  • Python进阶(5):类与继承
  • Unity_数据持久化_XML存储相关
  • 探索:Uniapp 安卓热更新
  • 智能合约漏洞导致的损失,法律责任应如何分配
  • 医疗后台管理系统开发实践
  • 分类任务当中常见指标 F1分数、recall、准确率分别是什么含义
  • 通过解决docker network connect实现同一个宿主机不同网络的容器间通信
  • 【stm32】点灯及蜂鸣器