KafKa 项目 -- GitHub 学习
1、情景导入
相信各位对于Kafka都不陌生吧,那就让我们来聊聊它的一些相关的底层源码吧
首先,我们通过科学的上网方式在github中下载分支为kafka-trunk相关源码包
这边建议通过下载zip的方式进行下载,好像用git clone 的方式不好拉取代码,当然这样也会有一个缺点,那就是你很难参与到这个项目的代码提交中去。
好的,然后让我们来看一些这个项目包的结构吧
下面就让我来一一介绍这些包他们的作用吧
(1) 模块文件夹
clients:包含 Kafka 客户端代码,用于与 Kafka 服务器进行交互。
core:包含 Kafka 核心功能的实现,如消息传递、存储和网络层。
server:包含 Kafka 服务器端代码,用于处理客户端请求和管理集群。
connect:包含 Kafka Connect 组件的代码,用于与外部系统进行数据集成。
streams:包含 Kafka Streams 的实现,用于构建流处理应用程序。
tools:包含各种工具类和实用程序,用于支持 Kafka 的运行和管理。
transaction-coordinator:管理事务的协调器,确保跨分区事务的一致性。
group-coordinator:负责消费者组的管理和协调。
raft:实现 Raft 算法,用于 Kafka 控制器的选举和一致性。
storage:处理 Kafka 的存储层,包括日志段和偏移量管理。
metadata:管理和存储 Kafka 的元数据。
tests:包含各种测试类和测试用例,用于验证 Kafka 的功能和性能。
shell:包含 Kafka Shell 工具的代码,用于命令行交互。
examples:提供一些示例代码,帮助用户理解和使用 Kafka。
docker:包含与 Docker 相关的配置和脚本,用于容器化部署。
docs:存放 Kafka 的文档,包括用户指南、API 文档和设计文档。
vagrant:包含 Vagrant 配置,用于虚拟化开发环境。
(2) 配置文件和脚本文件
build.gradle:Gradle 构建脚本,用于定义项目的构建逻辑、依赖关系和任务。
settings.gradle:Gradle 设置文件,用于配置多项目构建的项目结构。
gradlew 和 gradlewAll:Gradle Wrapper 脚本,允许在没有安装 Gradle 的环境下运行构建。
wrapper.gradle:Gradle Wrapper 的配置文件。
gradle.properties:Gradle 属性文件,用于配置构建的属性和参数。
checkstyle:存放 Checkstyle 配置文件,用于代码风格检查。
.github:包含 GitHub Actions 的配置文件,用于自动化工作流和 CI/CD。
.idea:IntelliJ IDEA 的项目配置文件夹,包含 IDE 特定的设置。
.gradle:Gradle 的缓存和配置文件夹,用于存储构建缓存和插件配置。
(3) 文档和其他文件
README.md:项目的 README 文件,提供项目的简介、安装和使用指南。
CONTRIBUTING.md:贡献指南,说明如何参与项目的开发和贡献代码。
LICENSE 和 NOTICE:项目的许可证和通知文件,说明项目的法律信息和版权。
HEADER:文件头模板,用于在源代码文件中添加版权信息。
doap_Kafka.rdf:项目元数据文件,以 RDF 格式描述项目信息。
Vagrantfile:Vagrant 配置文件,用于定义虚拟机环境。
2、代码分析与阅读
我们知道,在trunk分支中,主要是给我们开发者进行开发和测试的(因为这里含有代码检查相关工具包)以及相关的测试工具类等的。但是没有关系我们可以通过阅读他们的测试代码中的一些Api之间的调用来了解他们这些方法做了什么以及之间的逻辑思维和关联等。那么废话不多说,让我们现在开始吧。
2.1AdminFenceProducersTest
2.1.1 testFenceAfterProducerCommit
首先,我们看到当前包下的这个叫AdminFenceProducersTest类中,的testFenceAfterProducerCommit的方法
说明 这个集群实例是唯一的,因为他加上了final关键词进行修饰了,但是也仅仅是位置上面的唯一,也就是说这个类或者实例的内部的属性那些也是可以进行重新赋值和更改的。
private final ClusterInstance clusterInstance;
下面,这个是该类对方法的调用,这里我会一层层的调用进去,看看其实现的原理
layer 1
clusterInstance.createTopic(TOPIC_NAME, 1, (short) 1);layer 2
default void createTopic(String topicName, int partitions, short replicas) throws InterruptedException {createTopic(topicName, partitions, replicas, Map.of());
}layer 3
default void createTopic(String topicName, int partitions, short replicas, Map<String, String> props) throws InterruptedException {try (Admin admin = admin()) {admin.createTopics(List.of(new NewTopic(topicName, partitions, replicas).configs(props)));waitTopicCreation(topicName, partitions);}}
layer 1
方法的首次调用:参数分别为:主题名、分区数量、重复的数量
layer 2
加上了一个default关键字,那么这个就可以很好的保证我们在多接口实现过程中且遇到相同方法名的请求下,可以选择到一个默认的方法进行执行,此外关于继承了实现类又实现了接口,优先会获取到继承类的值。
layer 3
这里我们看到他引用了Map.of()【Java 9 引入的】来快速创建了小型的不可变映射集合,可以知道他在多层次的传参过程中,并不需要维护其可变的状态。
layer 4
default Admin admin() {return admin(Map.of(), false);
}layer 5
default Admin admin(Map<String, Object> configs, boolean usingBootstrapControllers) {Map<String, Object> props = new HashMap<>(configs);if (usingBootstrapControllers) {props.putIfAbsent(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, bootstrapControllers());props.remove(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);} else {props.putIfAbsent(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());props.remove(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG);}return Admin.create(setClientSaslConfig(props));
}layer 6
public String bootstrapControllers() {StringBuilder bld = new StringBuilder();String prefix = "";for (Entry<Integer, ControllerServer> entry : controllers.entrySet()) {int id = entry.getKey();ControllerServer controller = entry.getValue();ListenerName listenerName = nodes.controllerListenerName();// Although the KafkaConfig#listeners method normalizes the listener name,// the controller.listener.names configuration does not allow lowercase input,// so there is no lowercase controller listener name, and we don't need to normalize it.int port = controller.socketServer().boundPort(listenerName);if (port <= 0) {throw new RuntimeException("Controller " + id + " does not yet " +"have a bound port for " + listenerName + ". Did you start " +"the cluster yet?");}bld.append(prefix).append("localhost:").append(port);prefix = ",";}return bld.toString();}
layer 4
可以看到,这里还是一如既往的使用了 Map.of() 来进行参数的一个传递。
layer 5
在这里,初始化了一个Map,根据是否启用bootstarpControllers来对map进行不同属性来赋值。
layer 6
未完待续。。。。。。