当前位置: 首页 > article >正文

【C/C++】从零开始掌握Kafka

文章目录

  • 从零开始掌握Kafka
    • 一、Kafka 基础知识理解(理论)
      • 1. 核心组件与架构
      • 2. 重点概念解析
    • 二、Kafka 面试重点知识梳理
    • 三、C++ 使用 Kafka 的实践(librdkafka)
      • 1. librdkafka 简介
      • 2. 安装 librdkafka
    • 四、实战:高吞吐生产者与消费者
      • 1. 生产者示例(Producer.cpp)
      • 2. 消费者示例(Consumer.cpp)
    • 五、Kafka 开发相关 C++ 能力
    • 六、推荐资料与开源项目

从零开始掌握Kafka

一、Kafka 基础知识理解(理论)

1. 核心组件与架构

组件作用
BrokerKafka 节点,负责存储消息
Topic消息主题,逻辑上的分类
Partition一个 Topic 的分片,支持并发与扩展性
Producer负责发送消息
Consumer负责消费消息
Consumer Group多消费者协作消费
Zookeeper / KRaft负责元数据与协调(未来版本转向 KRaft 模式)

2. 重点概念解析

  • Partition:分片,支持水平扩展(每个 partition 是一个有序日志)。

  • 副本机制(Replication):每个 Partition 有一个 leader + N 个 follower,保证高可用。

  • 消费者组(Consumer Group):Kafka 实现广播和负载均衡消费的机制。

  • offset 管理

    • 自动提交(enable.auto.commit)
    • 手动提交(commitSync / commitAsync)
    • Kafka 默认 offset 存在 __consumer_offsets topic 中。

二、Kafka 面试重点知识梳理

面试点说明
消息顺序性同一个 partition 内有顺序,跨 partition 无法保证
幂等性生产使用 enable.idempotence=true,避免 producer 重试造成重复发送
分布式一致性ISR 机制,消息写入需同步到 follower;ACK=all 实现强一致
消费位点提交手动提交 offset 是保证消费语义精确一次的关键
Rebalance 原理消费者上下线会触发 Rebalance,导致 partition 分配变化

三、C++ 使用 Kafka 的实践(librdkafka)

1. librdkafka 简介

  • 官方提供的高性能 C/C++ Kafka 客户端库。

  • GitHub 地址:https://github.com/edenhill/librdkafka

  • 支持:

    • 高吞吐的生产与消费
    • offset 提交
    • topic/partition 管理
    • 幂等发送、压缩、批处理

2. 安装 librdkafka

# Ubuntu
sudo apt-get install librdkafka-dev# Or from source
git clone https://github.com/edenhill/librdkafka.git
cd librdkafka
./configure
make
sudo make install

四、实战:高吞吐生产者与消费者

此处只是简单介绍,完整工程见kafka简单工程

1. 生产者示例(Producer.cpp)

#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <csignal>
#include <memory>class ExampleEventCb : public RdKafka::EventCb {void event_cb(RdKafka::Event &event) override {if (event.type() == RdKafka::Event::EVENT_ERROR) {std::cerr << "Kafka Error: " << event.str() << std::endl;}}
};int main() {std::string brokers = "kafka:9092";std::string topic_str = "test_topic";std::string errstr;// 配置ExampleEventCb event_cb;std::unique_ptr<RdKafka::Conf> conf(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));conf->set("bootstrap.servers", brokers, errstr);conf->set("event_cb", &event_cb, errstr);// 创建 producerstd::unique_ptr<RdKafka::Producer> producer(RdKafka::Producer::create(conf.get(), errstr));if (!producer) {std::cerr << "Failed to create producer: " << errstr << std::endl;return 1;}// 创建 Topicstd::unique_ptr<RdKafka::Topic> topic(RdKafka::Topic::create(producer.get(), topic_str, nullptr, errstr));if (!topic) {std::cerr << "Failed to create topic: " << errstr << std::endl;return 1;}std::string message = "Hello from C++ Kafka Producer!";RdKafka::ErrorCode resp = producer->produce(topic.get(),                            // topic ptrRdKafka::Topic::PARTITION_UA,           // partitionRdKafka::Producer::RK_MSG_COPY,         // message flagsconst_cast<char *>(message.c_str()),    // payloadmessage.size(),                         // payload sizenullptr,                                // optional keynullptr);                               // opaqueif (resp != RdKafka::ERR_NO_ERROR) {std::cerr << "Produce failed: " << RdKafka::err2str(resp) << std::endl;} else {std::cout << "Message sent successfully\n";}producer->flush(3000);return 0;
}}

2. 消费者示例(Consumer.cpp)

#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <csignal>
#include <memory>bool running = true;void signal_handler(int) {running = false;
}class ExampleEventCb : public RdKafka::EventCb {void event_cb(RdKafka::Event &event) override {if (event.type() == RdKafka::Event::EVENT_ERROR) {std::cerr << "Kafka Error: " << event.str() << std::endl;}}
};int main() {signal(SIGINT, signal_handler);std::string brokers = "kafka:9092";std::string topic = "test_topic";std::string group_id = "cpp_consumer_group";std::string errstr;ExampleEventCb event_cb;auto conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);conf->set("bootstrap.servers", brokers, errstr);conf->set("group.id", group_id, errstr);conf->set("auto.offset.reset", "earliest", errstr);conf->set("event_cb", &event_cb, errstr);auto consumer = RdKafka::KafkaConsumer::create(conf, errstr);if (!consumer) {std::cerr << "Failed to create consumer: " << errstr << std::endl;return 1;}consumer->subscribe({topic});std::cout << "Consuming messages from topic " << topic << std::endl;while (running) {auto msg = consumer->consume(1000);if (msg->err() == RdKafka::ERR_NO_ERROR) {std::string message(reinterpret_cast<const char*>(msg->payload()), msg->len());std::cout << "Received message: " << message << std::endl;}}delete msg;}consumer->close();delete consumer;return 0;
}

五、Kafka 开发相关 C++ 能力

  • 熟练使用 RAII、智能指针、异常处理

  • 理解线程安全、异步模型(poll, callback)

  • 能够结合 JSON/XML 配置 Kafka 客户端

  • 编写模块化、高性能的消息收发组件

  • 构建系统:CMake

  • 日志:spdlog 或 glog

  • 单元测试:gtest

  • JSON:nlohmann/json


六、推荐资料与开源项目

  • 📚 Kafka 权威指南(原书第2版)

  • 📘 librdkafka 文档

  • 📖 Apache Kafka 官方文档

  • 💻 开源项目参考:

    • confluent-kafka-cpp
    • cppkafka(封装更现代 C++)
http://www.lryc.cn/news/2386057.html

相关文章:

  • 02_redis分布式锁原理
  • 简单血条于小怪攻击模板
  • Win11 系统登入时绑定微软邮箱导致用户名欠缺
  • 代码随想录算法训练营第四十六四十七天
  • 华硕FL8000U加装16G+32G=48G内存条
  • 前后端联调实战指南:Axios拦截器、CORS与JWT身份验证全解析
  • java高级 -Junit单元测试
  • 在 UVM验证环境中,验证 Out-of-Order或 Interleaving机制
  • V9数据库替换授权
  • 勇闯Chromium—— Chromium的多进程架构
  • Go语言中常量的命名规则详解
  • 软件质量保证与测试实验
  • 历年华东师范大学保研上机真题
  • 【C++】什么是静态库?什么是动态库?
  • 项目阅读:Instruction Defense
  • springboot中拦截器配置使用
  • 用 Python 构建自动驾驶的实时通信系统:让车辆“交流”起来!
  • 在机器学习中,L2正则化为什么能够缓过拟合?为何正则化等机制能够使一个“过度拟合训练集”的模型展现出更优的泛化性能?正则化
  • day36 python神经网络训练
  • k8s部署ELK补充篇:kubernetes-event-exporter收集Kubernetes集群中的事件
  • 【Excel VBA 】窗体控件分类
  • C++性能相关的部分内容
  • Spring Boot 项目中常用的 ORM 框架 (JPA/Hibernate) 在性能方面有哪些需要注意的点?
  • 基于大模型的大肠癌全流程预测与诊疗方案研究报告
  • 解决DeepSeek部署难题:提升效率与稳定性的关键策略
  • AI进行提问、改写、生图、联网搜索资料,嘎嘎方便!
  • GStreamer开发笔记(四):ubuntu搭建GStreamer基础开发环境以及基础Demo
  • 2021年认证杯SPSSPRO杯数学建模A题(第二阶段)医学图像的配准全过程文档及程序
  • CV中常用Backbone-3:Clip/SAM原理以及代码操作
  • RPC 协议详解、案例分析与应用场景