最全Kafka知识宝典之Kafka的基本使用
一、基本概念
传统上定义是一个分布式的基于发布/订阅模式的消息队列,主要应用在大数据实时处理场景,现在Kafka已经定义为一个分布式流平台,用于数据通道处理,数据流分析,数据集成和关键任务应用
必须了解的四个特性:
1、Kafak可以无缝的支持多个生产者,也支持多个消费者从一个单独的消息流中读取数据,且各个消费者之间互不影响
2、Kafka还允许消费者非实时读取消息,因为Kafka将消息按一定顺序持久化到磁盘,保证了数据不会丢失,顺序写磁盘的效率比随机写内存还要高,而且以时间复杂度为O(1)的方式提供消息持久化能力,对TB级以上的数据也能保证常数时间的访问性能。
3、由于Kafka可横向扩展生产者、消费者、broker,使得集群可以轻松处理巨大的消息流,在处理大量数据的同时,还能保证亚秒级的消息延迟,实时性极高。
4、Kafka消息都会在集群中进行备份,每个分区都有一台server作为leader,其他server作为follwers,当leader宕机了,follower中的一台server会自动成为新的leader,继续有条不紊的工作,所以容错性很高且集群的负载是平衡的。
一句话总结Kafka:
Kafka 适合高吞吐量、低延迟和大规模数据流处理的场景,尤其是需要实时数据传输和处理的大数据应用,而其他 MQ 在这些方面可能无法满足性能要求。
二、Kafka的架构
从上图就可以看出,Kafka的几个核心角色:生产者、消费者、Broker、zookeeper
product
生产者可以向消息队列发送各种类型的消息,由消费者消费。
这里注意,这里的生产者是可以有多个生产者服务的实例的
consumer
消费者从broker中消费消息。
这里需要注意,消费者也可以包含多个消费者实例,去消费同一个topic中的消息
总结:生产者与消费者都可以有多个,他们之间由topic来绑定关系
zookeeper
ZooKeeper 负责跟踪 Kafka 集群中的所有 Broker 节点,确保每个节点的状态(在线或离线)都被准确记录。其主要目的就是对所有broker做管理。比如:当某个主题的分区需要选举一个新的领导者时,ZooKeeper 负责协调这一过程,确保选举过程的公平性和一致性。
Broker
Broker就是Kafka消息服务的实例,可以部署多个broker形成kafka集群,在broker当中主要由下面这么几个角色来组成,我们以数据库的形式来类比介绍:
topic
这个相当于数据库中的数据表名,一个业务有一张表,也就是有一个topic,生产者往这个topic写消息,消费者从这个topic读消息
partition
这个相当于分表的概念,一张数据表太大了,查询影响性能,所以将一张大表水平拆分成多张表。所以partition就是将一个大的topic数据拆分成多个分区
Replication
这个是对每个partition做个备份,指定了一个partition需要多少个备份,那么我就会备份几份均匀分配到集群中所有的broker上
Leader&Follower
每个分区都有一个leader以及0个或者多个follower,在创建topic时,Kafka会将每个分区的leader均匀地分配在每个broker上,我们正常使用kafka是感觉不到leader、follower的存在的。
但其实,所有的读写操作都是由leader处理,而所有的follower都复制leader的日志数据文件,如果leader出现故障时,follower就会被选举为leader。
- Kafka中的leader负责处理读写操作,而follower只负责副本数据的同步
- 如果leader出现故障,其他follower会被重新选举为leader
- follower像一个consumer一样,拉取leader对应分区的数据,并保存到日志数据文件中
三、消息模型
消息由生产者发送到kafka集群后,会被消费者消费,一般来说我们的消费模型有两种:推送模型(psuh)和拉取模型(pull)
首先明确一下推拉模式到底是在讨论消息队列的哪一个步骤,一般而言我们在谈论推拉模式的时候指的是 Comsumer 和 Broker 之间的交互。
默认的认为 Producer 与 Broker 之间就是推的方式,即 Producer 将消息推送给 Broker,而不是 Broker 主动去拉取消息。
推模式
推模式指的是消息从 Broker 推向 Consumer,即 Consumer 被动的接收消息,由 Broker 来主导消息的发送
优点:
- 消息实时性高, Broker 接受完消息之后可以立马推送给 Consumer
- 对于消费者使用来说更简单,简单啊就等着,反正有消息来了就会推过来。
缺点:
- 推送速率难以适应消费速率,推模式的目标就是以最快的速度推送消息,当生产者往 Broker 发送消息的速率大于消费者消费消息的速率时,随着时间增长就会把消费者撑死,因为根本消费不过来啊
- 并且不同的消费者的消费速率还不一样,身为 Broker 很难平衡每个消费者的推送速率,如果要实现自适应的推送速率那就需要在推送的时候消费者告诉 Broker ,我不行了你推慢点吧,然后 Broker 需要维护每个消费者的状态进行推送速率的变更。
拉模式
拉模式指的是 Consumer 主动向 Broker 请求拉取消息,即 Broker 被动的发送消息给 Consumer
优点:
- 拉模式主动权就在消费者身上了,消费者可以根据自身的情况来发起拉取消息的请求,假设当前消费者觉得自己消费不过来了,它可以根据一定的策略停止拉取,或者间隔拉取都行。
- 拉模式下 Broker 就相对轻松了,它只管存生产者发来的消息,至于消费的时候自然由消费者主动发起,来一个请求就给它消息呗,从哪开始拿消息,拿多少消费者都告诉它,它就是一个没有感情的工具人,消费者要是没来取也不关它的事。
- 拉模式可以更合适的进行消息的批量发送,基于推模式可以来一个消息就推送,也可以缓存一些消息之后再推送,但是推送的时候其实不知道消费者到底能不能一次性处理这么多消息,而拉模式就更加合理,它可以参考消费者请求的信息来决定缓存多少消息之后批量发送。
缺点:
- 消息延迟,毕竟是消费者去拉取消息,但是消费者怎么知道消息到了呢?所以它只能不断地拉取,但是又不能很频繁地请求,太频繁了就变成消费者在攻击 Broker 了,因此需要降低请求的频率,比如隔个 2 秒请求一次,你看着消息就很有可能延迟 2 秒了。
- 消息忙请求,忙请求就是比如消息隔了几个小时才有,那么在几个小时之内消费者的请求都是无效的,在做无用功。
Kafka是什么模式
RocketMQ 和 Kafka 都选择了拉模式,当然业界也有基于推模式的消息队列如 ActiveMQ
Kafka采用了一种长轮询模式,这是基于拉模式的
Kafka 在拉请求中有参数,可以使得消费者请求在 “长轮询” 中阻塞等待。
简单的说就是消费者去 Broker 拉消息,定义了一个超时时间,也就是说消费者去请求消息,如果有的话马上返回消息,如果没有的话消费者等着直到超时,然后再次发起拉消息请求。注意:阻塞是发生在broker端的。
并且 Broker 也得配合,如果消费者请求过来,有消息肯定马上返回,没有消息那就建立一个延迟操作,等条件满足了再返回
四、Kafka的安装使用
1、安装
我们以三点Kafka节点的形式对Kafka进行安装部署
创建数据目录
mkdir -p /tmp/kafka/broker{1..3}/{data,logs}
创建配置文件
vi docker-compose.yaml
version: '2'
services:zookeeper:container_name: zookeeperimage: wurstmeister/zookeeperrestart: unless-stoppedhostname: zoo1ports:- "2181:2181"networks:- kafkakafka1:container_name: kafka1image: wurstmeister/kafkaports:- "9092:9092"environment:KAFKA_ADVERTISED_HOST_NAME: 192.168.245.253 ## 修改:宿主机IPKAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.245.253:9092 ## 修改:宿主机IPKAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"KAFKA_ADVERTISED_PORT: 9092KAFKA_BROKER_ID: 1KAFKA_LOG_DIRS: /kafka/datavolumes:- /tmp/kafka/broker1/logs:/opt/kafka/logs- /tmp/kafka/broker1/data:/kafka/datadepends_on:- zookeepernetworks:- kafkakafka2:container_name: kafka2image: wurstmeister/kafkaports:- "9093:9092"environment:KAFKA_ADVERTISED_HOST_NAME: 192.168.245.253 ## 修改:宿主机IPKAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.245.253:9093 ## 修改:宿主机IPKAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"KAFKA_ADVERTISED_PORT: 9093KAFKA_BROKER_ID: 2KAFKA_LOG_DIRS: /kafka/datavolumes:- /tmp/kafka/broker2/logs:/opt/kafka/logs- /tmp/kafka/broker2/data:/kafka/datadepends_on:- zookeepernetworks:- kafkakafka3:container_name: kafka3image: wurstmeister/kafkaports:- "9094:9092"environment:KAFKA_ADVERTISED_HOST_NAME: 192.168.245.253 ## 修改:宿主机IPKAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.245.253:9094 ## 修改:宿主机IPKAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"KAFKA_ADVERTISED_PORT: 9094KAFKA_BROKER_ID: 3KAFKA_LOG_DIRS: /kafka/datavolumes:- /tmp/kafka/broker3/logs:/opt/kafka/logs- /tmp/kafka/broker3/data:/kafka/datadepends_on:- zookeepernetworks:- kafkakafka-manager:image: sheepkiller/kafka-manager ## 镜像:开源的web管理kafka集群的界面environment:ZK_HOSTS: 192.168.245.253 ## 修改:宿主机IPports:- "9000:9000" ## 暴露端口networks:- kafka
networks:kafka:driver: bridge
启动
docker-compose up -d
2、命令行使用
首先进入kafka容器内部:
docker exec -it kafka1 /bin/bash
进入到Kafka安装目录
cd /opt/kafka
执行脚本
创建主题
sh bin/kafka-topics.sh --zookeeper zookeeper:2181 --create --topic test
查看所有主题
sh bin/kafka-topics.sh --zookeeper zookeeper:2181 -list
查看分区情况
sh bin/kafka-topics.sh --zookeeper zookeeper:2181 --describe --topic test
创建分区以及副本
sh bin/kafka-topics.sh --zookeeper zookeeper:2181 --create --topic test --partitions 3 --replication-factor 3
扩容分区
sh bin/kafka-topics.sh --zookeeper zookeeper:2181 --partitions 4 --alter --topic test
测试生产者发送消息
sh bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
测试消费者接收消息
sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
五、SpringBoot整合Kafka
1、添加依赖
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>
</dependencies>
2、配置Kafka
spring:kafka:bootstrap-servers: localhost:9092consumer:group-id: my-groupauto-offset-reset: earliestproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
3、创建生产者
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service
public class KafkaProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message);System.out.println("Sent message: " + message);}
}
4、创建消费者
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
public class KafkaConsumer {@KafkaListener(topics = "test", groupId = "my-group")public void consume(String message) {System.out.println("Received message: " + message);}
}