当前位置: 首页 > news >正文

四十、大数据技术之Kafka3.x(3)

🌻🌻 目录

  • 一、Kafka Broker
    • 1.1 Kafka Broker工作流程
      • 1.1.1 Zookeeper 存储的Kafka信息
      • 1.1.2 Kafka Broker 总体工作流程
      • 1.1.3 Broker 重要参数
    • 1.2 生产经验——节点服役和退役
      • 1.2.1 服役新节点
      • 1.2.2 退役旧节点
    • 1.3 Kafka 副本
      • 1.3.1 副本基本信息
      • 1.3.2 Leader 选举流程
      • 1.3.3 Leader 和 Follower 故障处理细节
      • 1.3.4 分区副本分配
      • 1.3.5 生产经验——手动调整分区副本存储
      • 1.3.6 生产经验——Leader Partition负载平衡
      • 1.3.7 生产经验——增加副本因子
    • 1.4 文件存储
      • 1.4.1 文件存储机制
      • 1.4.2 文件清理策略
    • 1.5 高效读写数据

一、Kafka Broker

1.1 Kafka Broker工作流程

1.1.1 Zookeeper 存储的Kafka信息

(1)启动Zookeeper客户端。

cd /usr/local/zookeeper/bin/ls./zkCli.sh 

在这里插入图片描述

(2)通过ls命令可以查看kafka相关信息。

ls /ls /kafka/ls /kafka/brokers/ls /kafka/brokers/ids

在这里插入图片描述

zookeerper 可视化工具 PrettyZoo:

  • 下载
  • 使用参考

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

1.1.2 Kafka Broker 总体工作流程

在这里插入图片描述

1)模拟Kafka上下线,Zookeeper中数据变化

(1)查看/kafka/brokers/ids路径上的节点。

./kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties./kafka-server-stop.sh

在这里插入图片描述

在这里插入图片描述

1.1.3 Broker 重要参数

在这里插入图片描述

1.2 生产经验——节点服役和退役

1.2.1 服役新节点

1)新节点准备
(1)关闭linux-102(已经装了hadoop,jdk,zookeeper,kafka),并右键执行克隆操作,linux-103,linux-104(注:之前若有直接可以删掉重新克隆)
(2)开启linux-103,linux-104 并修改IP地址。

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

④ 修改ip,将192.168.10.102修改为 192.168.10.103,保存,并重启网络

在这里插入图片描述

在这里插入图片描述

⑤ 修改 主机名

在这里插入图片描述
在这里插入图片描述

⑥ 重启进行远程连接

在这里插入图片描述

⑦ 删除日志,并分别修改linux-103与linux-102 的节点 id 和服务器 ip,并开启集群配置

在这里插入图片描述

在这里插入图片描述

开启集群配置(linux-102,linux-103,linux-104都需修改)

在这里插入图片描述

(3) 先启动linux-102,再启动linux-103,再次启动 linux-104(脚本启动后期更新)

cd /usr/local/zookeeper/bin/./zkServer.sh start./zkServer.sh statuscd /usr/local/kafka/bin./kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

(4) 查看 linux-102 上有哪些主题,并分配了分区(生产环境建议至少分两个副区,如果不小心删除一个,则还有一个类似备份的)

./kafka-topics.sh --bootstrap-server linux-102:9092 --list./kafka-topics.sh --bootstrap-server linux-102:9092 --topic first --describe

在这里插入图片描述

思考:如何将linux-102上面的一些分到linux-103达到负载均衡呢?看下面 2)

2)执行负载均衡操作

(1)创建一个要均衡的主题(在 linux-102 服务器的 kafka下面创建)。

cd /usr/local/kafkavi topics-to-move.json

在这里插入图片描述

{"topics": [{"topic": "first"}],"version": 1
}

在这里插入图片描述

(2)生成一个负载均衡的计划(在 linux-102 服务器的 kafka下面生成)。

# 0,1,2 代表三台服务器
bin/kafka-reassign-partitions.sh --bootstrap-server linux-102:9092  --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2" --generate

遇到错误java.io.IOException: Unable to read file topics-to-move.json

在这里插入图片描述

解决:直接切换到kafka 根目录进行生成即可:

在这里插入图片描述

(3)创建副本存储计划(所有副本存储在broker2broker3broker4)。

vi increase-replication-factor.json

在这里插入图片描述

输入如下内容:

{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[2],"log_dirs":["any"]},{"topic":"first","partition":1,"replicas":[2],"log_dirs":["any"]},{"topic":"first","partition":2,"replicas":[2],"log_dirs":["any"]},{"topic":"first","partition":3,"replicas":[2],"log_dirs":["any"]}]}

在这里插入图片描述

(4)执行副本存储计划。

bin/kafka-reassign-partitions.sh --bootstrap-server linux-102:9092 --reassignment-json-file increase-replication-factor.json --execute

在这里插入图片描述

(5)验证副本存储计划。

bin/kafka-reassign-partitions.sh --bootstrap-server linux-102:9092 --reassignment-json-file increase-replication-factor.json --verify

在这里插入图片描述

上述操作都是在linux-102上操作的。

1.2.2 退役旧节点

1)执行负载均衡操作

先按照退役一台节点,生成执行计划,然后按照服役时操作流程执行负载均衡

(1)创建一个要均衡的主题。

与上面(1)创建一个要均衡的主题(在 linux-102 服务器的 kafka下面创建)。一样无需再创建。

(2)创建执行计划。

#上面是分配到了三台机器 2,3,4,现在移除 4即服务器 linux-104
bin/kafka-reassign-partitions.sh --bootstrap-server linux-102:9092 --topics-to-move-json-file topics-to-move.json --broker-list "2,3" --generate

在这里插入图片描述

(3)创建副本存储计划(所有副本存储在broker2broker3)。

vi increase-replication-factor.json

在这里插入图片描述

删除前面的,将上面复制的粘贴里面保存即可。

在这里插入图片描述

粘贴上面复制的如下内容:

{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[2],"log_dirs":["any"]},{"topic":"first","partition":1,"replicas":[2],"log_dirs":["any"]},{"topic":"first","partition":2,"replicas":[2],"log_dirs":["any"]},{"topic":"first","partition":3,"replicas":[2],"log_dirs":["any"]}]}

(4)执行副本存储计划。

bin/kafka-reassign-partitions.sh --bootstrap-server linux-102:9092 --reassignment-json-file increase-replication-factor.json --execute

在这里插入图片描述

(5)验证副本存储计划。

bin/kafka-reassign-partitions.sh --bootstrap-server linux-102:9092 --reassignment-json-file increase-replication-factor.json --verify

在这里插入图片描述
2)执行停止命令

linux-104上执行停止命令即可

./kafka-server-stop.sh

在这里插入图片描述

1.3 Kafka 副本

下面的所有后期会详细更新(可跳过直接看 👉👉大数据技术之Kafka3.x(4)

1.3.1 副本基本信息

  • (1)Kafka 副本作用:提高数据可靠性
  • (2)Kafka默认副本1个,生产环境一般配置为2个,保证数据可靠性;太多副本会增加磁盘存储空间,增加网络上数据传输,降低效率。
  • (3)Kafka中副本分为:Leader和Follower。Kafka生产者只会把数据发往Leader,然后Follower找Leader进行同步数据。
    (4)Kafka分区中的所有副本统称为AR(Assigned Repllicas)。
    AR = ISR + OSR
    ISR,表示和Leader保持同步的Follower集合。如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。Leader发生故障之后,就会从ISR中选举新的Leader。
    OSR,表示Follower与Leader副本同步时,延迟过多的副本。

1.3.2 Leader 选举流程

(跳过后期更新)

Kafka 集群中有一个brokerController会被选举为Controller Leader,负责管理集群 broker 的上下线,所有topic分区副本分配Leader选举等工作。
Controller的信息同步工作是依赖于Zookeeper的。

在这里插入图片描述

开启 linux-102,linux-103,linux-104,进行如下操作:

(1)创建一个新的topic,4个分区,4个副本(因为我是单节点,所以设置了 1)

bin/kafka-topics.sh --bootstrap-server linux-102:9092 --create --topic Daniel1 --partitions 1 --replication-factor 1

在这里插入图片描述

(2)查看Leader分布情况

bin/kafka-topics.sh --bootstrap-server linux-102:9092 --describe --topic Daniel1

在这里插入图片描述

(3)停止掉linux-103的kafka进程,并查看Leader分区情况

在这里插入图片描述

(4)停止掉linux-104的kafka进程,并查看Leader分区情况
(5)启动linux-104的kafka进程,并查看Leader分区情况
(6)启动linux-104的kafka进程,并查看Leader分区情况
(7)停止掉linux-103的kafka进程,并查看Leader分区情况

1.3.3 Leader 和 Follower 故障处理细节

在这里插入图片描述

1.3.4 分区副本分配

(跳过后期更新)

如果kafka服务器只有4个节点,那么设置kafka的分区数大于服务器台数,在kafka底层如何分配存储副本呢?

1)创建16分区,3个副本

(1)创建一个新的topic,名称为second。

bin/kafka-topics.sh --bootstrap-server linux-102:9092 --create --topic second --partitions 16 --replication-factor 3

在这里插入图片描述

(2)查看分区和副本情况。

在这里插入图片描述

在这里插入图片描述

1.3.5 生产经验——手动调整分区副本存储

在这里插入图片描述

手动调整分区副本存储的步骤如下:

(1)创建一个新的topic,名称为three。

在这里插入图片描述

(2)查看分区副本存储情况。

在这里插入图片描述

(3)创建副本存储计划(所有副本都指定存储在broker0、broker1中)。
(4)执行副本存储计划。
(5)验证副本存储计划。
(6)查看分区副本存储情况。

1.3.6 生产经验——Leader Partition负载平衡

在这里插入图片描述

在这里插入图片描述

1.3.7 生产经验——增加副本因子

在生产环境当中,由于某个主题的重要等级需要提升,我们考虑增加副本。副本数的增加需要先制定计划,然后根据计划执行。

1)创建topic
2)手动增加副本存储
(1)创建副本存储计划(所有副本都指定存储在broker0、broker1、broker2中)。
(2)执行副本存储计划。

1.4 文件存储

1.4.1 文件存储机制

1)Topic数据的存储机制

在这里插入图片描述

2)思考:Topic数据到底存储在什么位置?

(1)启动生产者,并发送消息。
(2)查看linux-102(或者linux-103、linux-104)的/usr/local/kafka/datas/first-1(first-0、first-2)路径上的文件。
(3)直接查看log日志,发现是乱码。
(4)通过工具查看index和log信息。
3)index文件和log文件详解

在这里插入图片描述

说明:日志存储参数配置

在这里插入图片描述

1.4.2 文件清理策略

Kafka中默认的日志保存时间为7天,可以通过调整如下参数修改保存时间。

Kafka中默认的日志保存时间为7天,可以通过调整如下参数修改保存时间。

  • log.retention.hours,最低优先级小时,默认7天。
  • log.retention.minutes,分钟。
  • log.retention.ms,最高优先级毫秒。
  • log.retention.check.interval.ms,负责设置检查周期,默认5分钟。

那么日志一旦超过了设置的时间,怎么处理呢?

Kafka中提供的日志清理策略有delete和compact两种。
1)delete日志删除:将过期数据删除

  • log.cleanup.policy = delete 所有数据启用删除策略
    (1)基于时间:默认打开。以segment中所有记录中的最大时间戳作为该文件时间戳。
    (2)基于大小:默认关闭。超过设置的所有日志总大小,删除最早的segment。
    log.retention.bytes,默认等于-1,表示无穷大。

思考:如果一个segment中有一部分数据过期,一部分没有过期,怎么处理?

在这里插入图片描述

2)compact日志压缩

在这里插入图片描述

1.5 高效读写数据

  • 1)Kafka本身是分布式集群,可以采用分区技术,并行度高
  • 2)读数据采用稀疏索引,可以快速定位要消费的数据
  • 3)顺序写磁盘

Kafka的producer生产数据,要写入到log文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到600M/s,而随机写只有100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。

在这里插入图片描述

4)页缓存 + 零拷贝技术

在这里插入图片描述

http://www.lryc.cn/news/426017.html

相关文章:

  • redis——基本命令
  • pytorch实现单层线性回归模型
  • 智能小家电能否利用亚马逊VC搭上跨境快车?——WAYLI威利跨境助力商家
  • 顺丰科技25届秋季校园招聘常见问题答疑及校招网申测评笔试题型分析SHL题库Verify测评
  • 深入理解 Kibana 配置文件:一份详尽的指南
  • 算法的学习笔记—链表中倒数第 K 个结点(牛客JZ22)
  • 聊聊场景及场景测试
  • Spring Web MVC入门(中)
  • Django后端架构开发:后台管理与会话技术详解
  • 挑战Infiniband, 爆改Ethernet(2)
  • Postman文件上传接口测试
  • stm32入门学习14-电源控制
  • [C++][opencv]基于opencv实现photoshop算法色相和饱和度调整
  • Github 2024-08-16Java开源项目日报 Top10
  • AI学习记录 - torch 的 matmul和dot的关联,也就是点乘和点积的联系
  • leetcode 885. Spiral Matrix III
  • mysql windows安装与远程连接配置
  • 子网掩码是什么以及子网掩码相关计算
  • 仿RabbitMQ实现消息队列
  • SpringBoot教程(二十三) | SpringBoot实现分布式定时任务之xxl-job
  • 微前端架构的数据持久化策略与实践
  • 讲解 狼人杀中的买单双是什么意思
  • 回归分析系列5-贝叶斯回归
  • oracle 数据中lsnrctl 是干啥的
  • Linux进程--进程地址空间
  • C语言传递指针给函数
  • 探索 Kubernetes 持久化存储之 Rook Ceph 初窥门径
  • 今日(2024 年 8 月 13 日)科技新闻
  • Unity大场景切换进行异步加载时,如何设计加载进度条,并配置滑动条按照的曲线给定的速率滑动
  • Selenium + Python 自动化测试16(Python基础复习)