当前位置: 首页 > news >正文

浅学 Kafka

目录

kafka 体验卡

核心概念

partition

消费者组

搭建Zookeeper集群

搭建Kafak集群

消费者组分组消费机制

生产者拦截器机制

消息序列号机制

消息分区路由机制

生产者消息缓存机制

发送应答机制

生产者消息幂等性

生产者消息压缩 与 消息事务机制

SpringBoot 集成 Kafka


kafka 体验卡

kafk官网: Apache Kafka
当前基于 kafka_2.13-3.8.1.tgz 学习

# 解压 kafka

tar -zxvf kafka_2.13-3.8.1.tgz

# 进入安装目录

cd /home/app/kafka/kafka_2.13-3.8.1

# 基于默认配置 单机启动 kafka

nohup bin/kafka-server-start.sh config/server.properties &

# 基于默认配置 启动 kafka 自带的 zookeeper 服务

nohup bin/zookeeper-server-start.sh config/zookeeper.properties &

# 创建一个名为 test 的 topic,kafka默认端口为 9092

bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic test --create

# 查看 topic 列表 

bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic test --list

创建一个基于控制台的 向 test topic 发消息的生产者

bin/kafka-console-producer.sh --bootstrap-server localhost:9092  --topic test

创建一个基于控制台的 接受 test topic 的消费者

 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

核心概念

partition

  1. 一个 Topic 可以被分成多个 Partition(物理分区),每个 Partition 是一个 有序、不可变的消息日志,消息以追加的方式写入 Partition。
  2. Kafka 通过多个 Partition 来实现消息吞吐量的提升。可以把不同的 Partition 分配到不同的 Broker 上,从而分散负载,在创建topci的时候可以指定 partition 的数量,这一组 partition 会选举一个 leader 负责对外的读写请求,其他的 Replicas(副本) 负责复制数据
  3. 通过 Replication(副本),每个 Partition 可以有多个副本分布在不同的 Broker 上,防止数据丢失。
  4. Kafka 只保证 同一个 Partition 内的消息是有序的,不同 Partition 之间不保证顺序。
  5. 不同的消费者可以消费不同的 Partition,提高消费能力。

消费者组

  1. 在创建消费者的时候指定一样的 group.id 就是一组消费者组

搭建Zookeeper集群

基于3台服务器搭建zookeeper集群,以其中一个服务器为例子(下载 apache-zookeeper-3.8.4-bin.tar 的时候注意名字带有 -bin的才是)

# 解压zookeeper

cd /home/app/zookeeper

tar -xzf apache-zookeeper-3.8.4-bin.tar.gz
cd /home/app/zookeeper/apache-zookeeper-3.8.4-bin


# 创建 data 和 logs 目录

mkdir -p ./data ./logs

# 复制一份默认的配置文件模板,并重命名为conf/zoo.cfg(zookeeper默认识别配置文件zoo.cfg)
cp conf/zoo_sample.cfg conf/zoo.cfg

# 配置zoo.cfg
vim 


# 创建 myid 文件,该文件 中的 1 是 zookeeper 的唯一标识,相当于身份证

echo "1" > /home/app/zookeeper/apache-zookeeper-3.8.4-bin/data/myid


# 启动 zookeeper
bin/zkServer.sh --config conf start

  

搭建Kafak集群

基于3台服务器搭建kafka集群,以其中一个服务器为例子

编辑 server.properties 配置文件

        
vim config/server.properties


后台启动 kafka

nohup /home/app/kafka/kafka_2.13-3.8.1/bin/kafka-server-start.sh config/server.properties &

 Kafka 消息流转模型

消费者组分组消费机制

消费者消费完消息要给服务端响应,如果没有给响应,服务端就会认为消费者消费失败,就会重新给消费者组的其他成员推送消息。

消息消费到哪里了是由服务端的 offset 决定的 不由消费者决定。

  1. 当服务端消息丢失的时候,消费者是不知道的,会继续向服务端拉消息,为了解决这个问题可以在编写 消费者代码的时候去设置当拉不到消息的时候抛异常等。
  2. 当服务端的 offset 记录出错的时,那么消费者拉取到的消息就是错误的,为了解决这个问题,企业中常用做法是:当消费者拉取消息的时候,把这个 topic 作为hash key ,offset 作为 value 缓存到 redis 中。当下一次拉取消息的时候会去读 redis,比较上一次的 offset 就可以知道来取的消息是否有误,还可以做幂等性校验。

生产者拦截器机制

生产者拦截器(Producer Interceptor) 是一种允许你在消息发送到 Kafka 之前或之后执行自定义逻辑的机制。这种机制可以帮助你实现诸如监控、修改消息内容、记录日志等功能,而无需改变业务逻辑代码。
实现 org.apache.kafka.clients.producer.ProducerInterceptor 接口,并重写以下方法:

注意:此方法在 Kafka 客户端内部线程中执行,因此应尽量避免在此方法中执行耗时操作,以免影响性能。

消息序列号机制

kafka 服务端是以 二进制数组 接收传输的,因此在发送和接收消息都要做序列号处理。

kafka大多处理的是海量数据,这个 序列化的性能 以及 序列号后的比特大小 对 kafka 的影响会被放大N倍, 序列号后的比特大小 也是影响 消息存储 所占磁盘多少的一个很大因素。

对与传 常规的 字符串 或者 数字,可以直接使用 kafak 提供的序列号工具即可。如果传的是 对象User 就需要自定义序列号功能,因为kafak 没有提供 这种对象的 序列号。至于如何写出高性能序列号 交给AI吧 

消息分区路由机制

  • 顺序保证:Kafka 只能保证在单个分区内的消息顺序。因此,如果你的应用需要保持某些消息的顺序,可以通过控制这些消息进入同一个分区来实现。
  • 负载均衡:通过合理地将消息分发到不同的分区,可以有效地分散负载,提高系统的吞吐量和响应速度。
  • 容错能力:每个分区都有多个副本(Replicas),这提供了数据冗余,增强了系统的容错能力和可靠性。

生产者分区路由机制

  1. 如果在 ProducerRecord<K, V> 中指定了 Key,那么 Kafka 将根据 Key 的哈希值来选择分区。也可以通过设置 partitioner.class 属性来自定义分区策略,从而改变默认的分区逻辑
  2. 如果没有指定 Key,Kafka 会采用轮询的方式将消息均匀地分配给所有可用的分区。注意,这里的“轮询”是基于每个生产者的独立计数器进行的,而不是全局的。
  3. 通过实现 org.apache.kafka.clients.producer.Partitioner 接口来自定义分区逻辑。例如,根据业务需求将特定类型的消息定向到特定的分区。

当一个主题有多个分区,并且这些分区需要被分配给一个消费者组中的多个消费者时,Kafka 使用所谓的“分区分配策略”来决定哪个消费者应该负责哪个分区。

消费者分区路由机制

  1. RangeAssignor:将每个主题的分区按顺序排列,并平均分配给消费者。如果不能均分,则前面的消费者会多分配一个或几个分区。
  2.  RoundRobinAssignor:将所有主题的所有分区按照字典序排序,然后轮询地分配给消费者

  3. StickyAssignor:保持现有的分区分配尽可能不变的同时,尽量做到均衡分配。当发生再平衡时,它会尝试最小化分区迁移的数量

生产者消息缓存机制

发送应答机制

🛠️ acks 参数详解

是控制生产者发送消息后如何得到确认的核心参数。 客户端生产者可以配置

1. acks=0

  • 含义:生产者不会等待来自服务器的任何确认。消息一旦被发送出去,就认为已经成功了。
  • 优点:提供最高的吞吐量,因为不需要等待确认。
  • 缺点:如果消息没有到达 broker 或者 broker 在接收消息后崩溃,消息将会丢失。

2. acks=1

  • 含义:生产者会等待 leader 副本确认已收到消息。这意味着只要 leader 已经写入消息,即使 follower 还没有复制该消息,生产者也会认为消息已成功发送。
  • 优点:相比于 acks=0 提供更高的可靠性,同时保持较好的性能。
  • 缺点:如果 leader 在确认之后但在同步给 follower 之前崩溃,则消息可能会丢失。

3. acks=all 或 -1

  • 含义:生产者将等待所有同步副本(包括leader partition 和 副本)确认收到消息。这是最强的一致性保障模式。
  • 优点:提供了最高级别的数据可靠性,确保消息不仅被 leader 接收,还至少被一个 follower 成功复制。
  • 缺点:由于需要等待更多的确认,这会导致较高的延迟,并可能降低系统的吞吐量。
但是acks = all 或者 或者 0 都太绝对太极端了,可以选择更折中的配置:
min.insync.replicas :

生产者消息幂等性

pid 对用户是不可见的,生产者发消息的时候会携带 pid 和 sequence,写下数据到 partition 时会存下 这些值用于下次比较。

生产者消息压缩 与 消息事务机制

消息压缩机制

消息事务机制

生产者的幂等性只能保证在单机中?

事务机制依赖于幂等性来避免重复消息的问题。

Producer提供了对应的API:

SpringBoot 集成 Kafka

<!--  spring 社区提供的 kafka 依赖-->
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
案例代码就不展示了
http://www.lryc.cn/news/581932.html

相关文章:

  • 汽车功能安全系统阶段开发【技术安全需求TSR】4
  • 图像处理中的边缘填充:原理与实践
  • 【保姆级图文详解】大模型、Spring AI编程调用大模型
  • 2025最新如何解决VSCode远程连接开发机失败/解决方案大全
  • Python操作mysql数据库:数据库三层结构,Mysql建表语句操作,mysql的数据库备份,mysql的数据库恢复
  • 图像处理中的插值方法:原理与实践
  • ​​MySQL高可用架构深度解析:主从复制、MGR与读写分离实战​​
  • 使用 GDB 调试 Redis 服务进程指南
  • PC端基于SpringBoot架构控制无人机(三):系统架构设计
  • FlashDepth | 混合模型+Mamba革新,24 FPS实时2K视频深度估计,超越Depth Anything v2
  • (倍增)洛谷 P1613 跑路/P4155 国旗计划
  • ZooKeeper 实现分布式锁
  • 【Note】《Kafka: The Definitive Guide》 第5章:深入 Kafka 内部结构,理解分布式日志系统的核心奥秘
  • 【kafka-python使用学习笔记2】Python操作Kafka之环境准备(2)亲测有效有图有真相
  • 专为磁盘存储设计的数据结构——B树
  • 快速上手百宝箱搭建知识闯关游戏助手
  • 第二届虚拟现实、图像和信号处理国际学术会议(VRISP 2025)
  • Java面试宝典:异常
  • Python实现MCP Server的完整Demo
  • 北京-4年功能测试2年空窗-报培训班学测开-第四十四天
  • 《Effective Python》第十二章 数据结构与算法——当精度至关重要时使用 decimal
  • Node.js特训专栏-实战进阶:14.JWT令牌认证原理与实现
  • 《30天打牢数模基础-第一版》(已完结) 需要自取
  • macOS运行python程序遇libiomp5.dylib库冲突错误解决方案
  • 基于Rust红岩题材游戏、汽车控制系统、机器人运动学游戏实例
  • 在内网环境中,Java服务调用PHP接口时报错的排查方法
  • Mac 电脑无法读取硬盘的解决方案
  • AI智能体长期记忆系统架构设计与落地实践:从理论到生产部署
  • 文件操作(java)
  • window显示驱动开发—X 通道解释