当前位置: 首页 > news >正文

【车联网kafka】Kafka核心架构与实战经验(第一篇)

目录

一、我与kafka的缘分-初识Kafka

二、Kafka深入探讨-了解kafka

​编辑2.1 kafka 生产者框架

2.1.1 生产者在生活中的实例

2.1.2 kafka生产者流程及框架

1. 主线程处理阶段

2. Sender线程处理阶段

设计优势总结

2.2 kafka 生产者框架中的一些关键参数

2.3 kafka 生产者框架中一些关键问题

2.3.1 kafka如何对消息进行统一处理呢

2.3.2 kafka如何知道发送给哪个分区呢,有哪些分区分配策略?如何发送消息到固定分区?

常见的Kafka生产者分区策略总结

2.3.3 主要用到的序列化方式

2.3.4 如何保证消息发送的一致性、ack如何设置?

第一、ack的设置

第二、确保消息发送

选择建议与权衡

2.3.5 如何保证消息发送的幂等性

示例配置建议

2.3.6 kafka发送的retry机制

2.3.7 如果发送的消息比较大怎么办呢

1. 处理超过batch size的消息(默认16KB)

2. 处理超过max.request.size的消息(默认1MB)

3. 处理非常大的消息(如10MB或更大)

整体优化建议总结

OK,今天先到这里后面几篇会继续把kafka讲完


一、我与kafka的缘分-初识Kafka

     在车联网平台的实际应用中,我们采用 Kafka 作为核心消息中间件实现系统解耦,主要处理车辆 T-Box 设备上报的报文数据。当前平台已接入 30 万辆车辆,以每10秒 1 包的频率持续上传数据。按此规模计算:

  • 单日数据量:单天的数据量在2.8亿条
  • 车辆在线率:   30%-40%

选择 Kafka 主要基于两个关键考量:

  1. 海量数据承载能力:其分布式架构与零拷贝机制可稳定支撑 500000 量级/秒的写入吞吐
  2. 业务容错特性:车辆数据场景允许部分消息丢失(如网络闪断),而 Kafka 的异步刷盘机制在保证吞吐的同时,恰能满足该容忍度

      通过 Kafka 的缓冲削峰,我们有效隔离了数据生产端(车辆上行)与消费端(业务处理系统)。此前虽已实际应用,但尚未深入技术原理。现借此机会,我们梳理一下kafka。

使用场景

二、Kafka深入探讨-了解kafka

        当谈到Apache Kafka时,我认为它就是一个高性能的分布式数据流平台,类似于一个超级大的临时流转的存储仓库,能够高效处理海量实时流转数据。不过,它有两个核心特点:

        第一:数据通过生产者"前门进",通过消费者"后门出",形成连续的数据管道;

        第二:由于分布式系统的特性,数据在极端情况下可能会丢失(例如,在故障或配置不当的场景)。

        因此,对于需要强数据一致性的应用场景(如金融交易或实时记账系统),Kafka 往往不太合适,因为它更侧重于高吞吐量和最终一致性。

2.1 kafka 生产者框架

2.1.1 生产者在生活中的实例

        想象一下,Kafka的发送者就像是一个送货员,他的任务很简单:把货物(消息)送到指定地方就行。但如果您是负责设计整个送货系统的“老板”,就不能只盯着送货员的工作了——您得操心整个链条的细节,确保一切高效可靠。这涉及到一系列关键问题:

  1. 货物要送给谁?
    就像送货前得知道收货地址一样,您需要确定消息具体要发给哪些人或系统。

  2. 运输方式怎么选?
    是用小车(高效工具)快速送,还是自己走路(慢速方式)慢慢送?这会影响送货速度和成本。

  3. 货物需要整理或压缩吗?
    如果用小车送,是不是得把货物打包整齐?或者压缩一下,节省空间,让一次能送更多货?

  4. 怎么确保货物送到仓库?
    万一路上出问题,比如车子坏了,如何保证货物最终安全到达?需要什么保险措施?

  5. 怎么避免一个货物送两次?
    如何确保每个货物只送一次,不会重复发送,造成混乱或浪费?

  6. 送货前要检查货物吗?
    是否需要先过滤危险品(如有害数据),确保只送安全、有用的货物?

  7. 送货路径怎么安排?
    是直接送到最终目的地(总部),还是先送到中转站(配送点),再转交过去

        作为系统设计者,您得像一个精明的老板一样,从全局角度思考这些细节,而不是只关注送货员本身的动作。那么kafka其实就是一个精明的老板,所有的这些细节都考虑到位了,那么我们可以看一下kafka得“送货系统”是如何设计的呢?

2.1.2 kafka生产者流程及框架

kafka发送架构

        在Kafka生产者中,消息发送过程由两条线程协同完成:主线程负责消息的预处理和暂存,Sender线程负责消息的发送。这种设计通过批量处理机制(称为消息批次)显著提高了吞吐量和效率。以下是详细流程:

1. 主线程处理阶段
  • 消息初始化:主线程生成消息后,首先通过消息拦截器进行转换(例如,添加元数据或过滤)。
  • 序列化:转换后的消息必须经过序列化(将对象转换为字节流),以满足网络传输要求。
  • 分区选择:序列化后的消息通过分区器确定目标分区(主题是逻辑概念,实际数据存储在分区中)。分区器基于键或策略选择合适的分区。
  • 批次积累:消息进入消息累加器等待,与其他消息组合成批次。这种批量机制减少了网络开销,提升整体吞吐量。
2. Sender线程处理阶段
  • 批次提取:当批次就绪(例如,达到大小或时间阈值),Sender线程从消息累加器中提取完整批次。
  • 请求封装:提取的批次被封装成Request对象,并通过NetworkClient进行排队和传输。
  • 发送执行:NetworkClient将请求发送到Kafka集群,确保可靠性和效率。
设计优势总结

这种线程分离设计(主线程处理本地逻辑,Sender线程处理网络I/O)避免了阻塞,并利用批次机制优化资源使用。消息批次是Kafka高吞吐的核心,它通过减少小消息的单独发送,降低了延迟并提升了集群性能。

2.2 kafka 生产者框架中的一些关键参数

参数名称描述
key.serializer 和 value.serializer指定发送消息的 key 和 value 的序列化类型。必须使用全类名。
buffer.memoryRecordAccumulator 缓冲区总大小,默认值为 32m。
batch.size缓冲区一批数据最大值,默认值为 16k。适当增加该值可提高吞吐量,但设置过大可能导致数据传输延迟增加。
linger.ms如果数据未达到 batch.size,sender 在等待该时间后发送数据。单位为 ms,默认值 0ms(无延迟)。生产环境建议设置为 5-100ms。
acks应答机制:0-生产者发送数据后不需落盘应答;1-Leader 收到数据后应答;-1(all)-Leader 和 isr 队列所有节点收齐数据后应答。默认值 -1,等价于 all。
max.in.flight.requests.per.connection允许最多未返回 ack 的次数,默认为 5。开启幂等性时,该值需在 1-5 范围内。
retries消息发送错误时系统重发次数,默认值为 int 最大值(2147483647)。若需保证消息有序性,需设置 max.in.flight.requests.per.connection=1,否则重试失败消息时其他消息可能已发送成功。
retry.backoff.ms两次重试之间的时间间隔,默认值为 100ms。
enable.idempotence是否开启幂等性,默认值为 true(开启)。
compression.type生产者发送数据的压缩方式,默认值为 none(不压缩)。支持类型:none、gzip、snappy、lz4 和 zstd。

2.3 kafka 生产者框架中一些关键问题

2.3.1 kafka如何对消息进行统一处理呢

通过 kafka 生产者的 拦截器进行消息的转化处理。一般我不会使用,毕竟消息都是处理好发送的,没必要在拦截一层在进行处理。

        KafkaProducer在将消息序列化和计算分区之前会调用生产者拦截器的onSend()方法来对消息进行相应的定制化操作。一般来说最好不要修改消息 ProducerRecord 的 topic、key 和partition 等信息,如果要修改,则需确保对其有准确的判断,否则会与预想的效果出现偏差。比如修改key不仅会影响分区的计算,同样会影响broker端日志压缩(Log Compaction)的功能。

        KafkaProducer 会在消息被应答(Acknowledgement)之前或消息发送失败时调用生产者拦截器的 onAcknowledgement()方法,优先于用户设定的 Callback 之前执行。这个方法运行在Producer 的 I/O 线程中,所以这个方法中实现的代码逻辑越简单越好,否则会影响消息的发送速度。

        close()方法主要用于在关闭拦截器时执行一些资源的清理工作。

2.3.2 kafka如何知道发送给哪个分区呢,有哪些分区分配策略?如何发送消息到固定分区?

        在Kafka中,组装ProducerRecord消息时,可以通过判断消息的key或value来显式指定分区号。这种方式在技术上可行,但不推荐,因为它会将分区选择逻辑与消息组装过程耦合,导致代码可维护性降低。例如,业务逻辑变更可能直接影响分区策略,增加系统复杂度。

1. 未指定分区号时的默认行为,如果未显式指定分区号(例如:ProducerRecord record = new ProducerRecord("test-topic", "key-" + i, s);),Kafka会使用默认的DefaultPartitioner分区器:

2.如果我们在进行消息组装的时候,指定了分区号(例如:ProducerRecord record = new ProducerRecord("test-topic",1,"key-" + i,s),那么,kafka就不会使用分区器。

常见的Kafka生产者分区策略总结
  1. 默认分区策略(Default Partition Strategy)

    • 基于消息的key决定分区:如果key不为null,则使用key的哈希值(例如,通过哈希函数计算分区索引);如果key为null,则自动切换到轮询策略。
    • 优点:简单高效,适用于大多数场景,能保证相同key的消息分配到同一分区以实现有序性。
    • 缺点:当key分布不均匀时,可能导致分区负载不均衡。
  2. 轮询策略(Round-Robin Partition Strategy)

    • 当消息key为null时,生产者自动使用此策略:消息依次循环发送到所有可用分区(例如,分区0、1、2...然后重复)。
    • 优点:确保消息均匀分布,避免单个分区过载;适用于无key或低顺序要求的场景。
    • 缺点:不保证相同key的消息顺序性,可能影响某些应用的一致性需求。
  3. 自定义分区器(Custom Partitioner)

    • 用户可通过实现Partitioner接口(如Java中的org.apache.kafka.clients.producer.Partitioner)自定义逻辑,基于消息key、value或其他属性(如时间戳或业务ID)计算分区。
    • 优点:高度灵活,能适配复杂需求(如根据地理区域或用户ID分区);支持集成外部系统。
    • 缺点:需要额外开发,可能引入性能开销;需确保逻辑正确以避免分区热点。
  4. 一致性哈希分区(Consistent Hashing)

    • 非Kafka原生支持,但可通过自定义分区器实现:使用一致性哈希算法(例如,基于虚拟节点)在多个维度(如多个key或属性)上均匀分布数据,减少分区重平衡时的数据迁移。
    • 优点:提升扩展性和容错性,特别适合动态集群环境;能平衡负载并减少“热点”问题。
    • 缺点:实现复杂,需自定义开发;哈希计算可能增加延迟。

    2.3.3 主要用到的序列化方式

            我们公司这边对key和value都采用String序列化方式。对于复杂的对象,我们一般使用json转化包将我们复杂的对象转化成 json字符串,然后就可以使用我们的String的序列化方式了

    2.3.4 如何保证消息发送的一致性、ack如何设置?

    第一、ack的设置

            Kafka的acks参数是生产者端的配置,用于控制消息副本的写入确认级别。它影响消息的持久性和系统吞吐量,具体分为三个级别:

    • acks = 0:生产者不等待任何服务器确认。这提供了最高吞吐量(延迟最低),但安全性最低。如果服务器故障,消息可能丢失。适用场景包括:
      • 网站点击量统计
      • 页面停留时间记录
      • 视频播放量追踪(这些场景容忍少量数据丢失)
    • acks = 1:默认设置。生产者等待首领节点(Leader)确认写入成功即可。相比acks=0,吞吐量略有下降,但可靠性提高(首领节点故障时可能丢失消息)。适合大多数实时数据处理,如用户行为分析。
    • acks = all(或acks = -1):生产者等待所有副本节点(包括ISR列表)都确认写入成功。这提供最高可靠性(消息几乎不会丢失),但吞吐量最低、延迟最高。适用场景包括:
      • 金融交易记录
      • 关键事件审计(如支付确认)
    第二、确保消息发送

    三种消息发送确认机制的优劣比较:

    机制类型吞吐量可靠性实时反馈资源消耗适用场景
    消息拦截器

    (onAcknowledgement方法)

    实时需要每条消息精确追踪的场景
    同步发送(Future.get)实时强一致性要求的低频场景
    异步回调(Callback)异步高吞吐量要求的业务场景

    采用第三种方式往往可以具备高吞吐,推荐 

    import org.apache.kafka.clients.producer.*;
    import java.util.Properties;public class KafkaProducerOptimized {public static void main(String[] args) {// 1. 配置Producer属性Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("batch.size", 16384); // 批量大小,单位字节,优化吞吐量props.put("linger.ms", 5); // 发送延迟,允许批量累积,提高吞吐// 2. 创建Producer实例try (Producer<String, String> producer = new KafkaProducer<>(props)) {// 3. 发送消息,并添加Callbackfor (int i = 0; i < 10; i++) { // 示例:发送多条消息ProducerRecord<String, String> record = new ProducerRecord<>("your-topic", "key-" + i, "value-" + i);producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception instanceof RetriableException) {// 可重试异常} else {// 不可重试异常,记录并放弃}
    }}});}} catch (Exception e) {// 全局异常处理System.err.println("Producer异常: " + e.getMessage());}}
    }
    选择建议与权衡

    没有固定“最佳”设置,需根据业务需求平衡性能和安全性:

    • 性能优先:选择acks=0,适用于高吞吐、低延迟场景(如监控数据)。
    • 可靠性优先:选择acks=all(Kafka),适用于数据一致性要求高的系统(如库存管理)。
    • 一般场景acks=1(Kafka)是良好折衷,兼顾效率和基本可靠性。

    2.3.5 如何保证消息发送的幂等性

            在分布式系统中,使用 Apache Kafka 进行消息传递时,保证消息发送的幂等性至关重要。幂等性确保无论操作执行多少次(例如,由于网络重试或故障恢复),消息处理结果都不会改变,从而避免数据重复和不一致。Kafka 通过以下核心机制实现这一目标:

    1. 生产者配置

      • 启用幂等性:设置 enable.idempotence=true,生产者会为每个消息分配一个唯一的 PID(生产者 ID)和一个单调递增的序列号(例如,序列号从 0 开始递增)。这样,Kafka broker 可以识别并丢弃重复消息。
      • 确认模式:推荐设置 acks=all,确保生产者收到所有副本的确认,提升可靠性。这与幂等性结合使用,能有效处理网络问题导致的重发。
    2. 序列号和 PID 管理

      • 每个生产者实例分配一个唯一 PID,每个消息附带一个序列号。Kafka broker 基于 PID 和序列号检查消息是否重复,如果序列号不连续或重复,则拒绝处理。
    3. 存储机制

      • Kafka 的日志存储设计(如分区日志文件)持久化消息状态。通过记录序列号和 PID,即使在 broker 故障恢复后,也能从日志中恢复并避免重复处理。
    4. 事务支持

      • 事务本身不直接提供幂等性,但可与幂等性结合使用(例如,设置 enable.idempotence=true 并开启事务)。这确保了跨分区操作的原子性和一致性,适用于复杂场景(如多消息事务提交)。
    5. 客户端库支持

      • 使用最新 Kafka 客户端库(如 Kafka Java 客户端),确保实现无缺陷。早期版本可能不支持某些功能,因此升级库版本是必要的最佳实践。
    示例配置建议
    enable.idempotence=true  # 启用幂等性
    acks=all                 # 确保所有副本确认
    retries=2147483647       # 设置高重试次数(接近最大值),配合幂等性处理网络重试
    注意事项:幂等性仅针对单个生产者实例的单个分区有效。如果涉及多分区或复杂事务,需额外配置事务管理。实际部署中,测试重试场景以验证配置可靠性。

    2.3.6 kafka发送的retry机制

            一般情况下 我们这边没有使用过kafka自己的retry机制,我们通过kafka的callback机制对发送失败的消息进行监听,然后再次进行发送(失败的消息保存到db中,日志中)

            如果使用retry可以通过retries 参数控制重试的次数,通过retry.backoff.ms控制重试次数之间的时间间隔。间隔是500ms,之所以设置为500ms,kafka中topic分区的副本首领选举的整个过程是500ms以内完成的

            kafka的retry这机制,不是每次都retry的,如果收到了这样的error:

            消息大小超过了request.max.size 或者超过了message.max.bytes 类似这样的错误,kafka是不会选择重试的因为这种错误是无法通过重试而成功的

            如果是因为网络延迟、网络抖动啊、分区的一系列暂时不可用啊,这种错误kafka认为有可能在重试的过程中成功。

    2.3.7 如果发送的消息比较大怎么办呢

            我们这边消息在1MB以下,Kafka的设计初衷是处理高吞吐量的小消息,对于大消息(如超过16KB或1MB)存在性能瓶颈,需要针对性调整参数或采用替代方案。以下是分步优化建议:

    1. 处理超过batch size的消息(默认16KB)
    • Kafka的batch size默认为16KB。如果单个消息超过16KB,消息累加器会直接发送该消息,而不是将其拆分成多个batch组装。这避免了消息分割的开销,但引入了内存性能问题:
      • 对于小于或等于16KB的消息,Kafka复用java.io.ByteBuffer(一个固定大小的缓冲区),减少了内存申请和GC压力。
      • 对于大于16KB的消息,Kafka无法复用ByteBuffer,每次都需要申请新内存空间(大小等于消息本身)。这会导致频繁的内存分配和垃圾回收,影响性能(如吞吐量下降、延迟增加)。
    • 优化建议:适当调大batch size(例如调整为32KB或64KB),通过生产者配置参数batch.size设置。这可以增加消息复用的概率,减少内存开辟开销。但需注意,batch size过大可能增加延迟(等待更多消息填充batch),需根据业务场景平衡。
    2. 处理超过max.request.size的消息(默认1MB)
    • Kafka生产者参数max.request.size限制单条消息最大大小(默认1MB)。如果消息超过此限制(如2MB),仅调整该参数不够,还需协调broker和消费者配置:
      • 生产者端:将max.request.size调整为2MB(或更大),确保生产者能发送大消息。
      • Broker端:Broker通过message.max.bytes参数控制接收的最大消息大小(默认1MB)。需将其至少调整为2MB,以匹配生产者设置。否则,broker会拒绝过大消息。
      • 消费者端:消费者通过fetch.max.bytes参数控制每次拉取的最大消息大小(默认1MB)。需将其至少调整为2MB,确保消费者能接收和处理大消息。
      • 经验策略:设置参数时,遵循max.request.size < message.max.bytes < fetch.max.bytes(例如,生产者2MB、broker 2.1MB、消费者2.2MB)。这避免因参数不一致导致消息拒绝或失败。调整后,重启Kafka集群生效。
    3. 处理非常大的消息(如10MB或更大)
    • Kafka不适合处理超大消息(如10MB以上),因为:
      • 内存开销大:每次发送都需要申请新空间,增加GC压力。
      • 网络和存储瓶颈:大消息传输慢,可能阻塞broker线程,影响整体吞吐量。
      • 设计局限:Kafka优化于小消息流式处理,大消息会破坏其性能优势。
    • 替代方案:优先考虑非Kafka组件,如:
      • 使用文件传输协议(如SFTP、HTTP)或网络附加存储(NAS)传输大文件。
      • 将大消息存储在对象存储(如S3),然后通过Kafka发送文件引用(如URL)。
    • 如果必须使用Kafka的优化策略
      • 策略1:消息拆分与顺序保证
        • 将大消息拆分成多个小消息(每个小于batch size或max.request.size)。
        • 生产者指定所有小消息发送到同一分区(使用相同key),并采用单线程发送,确保消息顺序(如先发消息头,再发消息体)。
        • 消费者端按顺序接收并组装小消息还原为大消息。这需要应用层逻辑支持,但能避免Kafka大消息的性能问题。
      • 策略2:消息压缩
        • 启用生产者压缩,通过参数compression.type设置压缩算法(如snappy、gzip或lz4)。压缩可减少消息大小(例如,gzip压缩率可达70%),降低网络和内存开销。
        • 示例:设置compression.type=gzip,生产者自动压缩消息,消费者自动解压。但需注意,压缩会增加CPU开销,需测试性能影响。
    整体优化建议总结
    • 预防为主:在业务设计阶段,避免发送大消息(如超过1MB)。优先拆分或压缩消息。
    • 参数调优
      • 调大batch.size(如32KB)以优化内存复用。
      • 对大消息(>1MB),协调调整max.request.sizemessage.max.bytesfetch.max.bytes,确保三者匹配且略递增。
      • 监控Kafka性能(如使用JMX或Kafka监控工具),根据指标调整参数。
    • 性能权衡:调大参数可能增加内存使用和延迟,测试环境验证后再上线。
    • 大消息处理原则:对于>10MB消息,强烈建议使用替代方案。如果必须用Kafka,结合拆分和压缩策略。

    通过以上优化,可缓解Kafka大消息的性能问题,但核心是控制消息大小在合理范围内(如<1MB),以发挥Kafka的高吞吐优势。

    OK,今天先到这里后面几篇会继续把kafka讲完

    http://www.lryc.cn/news/603895.html

    相关文章:

  • 13、select_points_object_model_3d解析
  • 【2025年7月29日】TrollStore巨魔商店恢复在线安装
  • 通缩漩涡中的测量突围:新启航如何以国产 3D 白光干涉仪劈开半导体成本困局?
  • 磁悬浮转子同频振动:自适应陷波器设计与稳定性深度解析(附MATLAB代码)
  • 开源数据库PostgreSQL专家技术
  • AI药师助手 + 药品图谱系统完整操作分析(python版)
  • 基于AI代码疫苗技术的开源软件供应链安全治理
  • 出现错误,Microsoft store初始化失败。请尝试刷新或稍后返回。
  • 多模态融合 + 动态记忆机制,突破模态壁垒,超火研究方向
  • Xilinx高性能低延时PCIe-DMA控制器IP,SGDMA,QDMA,RDMA,CDMA,V4L2驱动,视频采集、AD采集
  • C#基础篇 - 正则表达式入门
  • 在Word和WPS文字中让文字无极限缩放,用键盘更高效
  • C51 中断
  • Python批量生成N天前的多word个文件,并根据excel统计数据,修改word模板,合并多个word文件
  • 理解“无界队列”与“有界队列”及其适用场景
  • git使用lfs解决大文件上传限制
  • 2411.按位或最大的最小子数组长度
  • gTest测试框架的安装与配置
  • 三、Linux用户与权限管理详解
  • 【目标检测】小样本度量学习
  • 量子计算革命:重新定义计算的边界与未来
  • DNS污染与劫持
  • Python爬虫02_Requests实战网页采集器
  • MoR vs MoE架构对比:更少参数、更快推理的大模型新选择
  • Ubuntu20.04子系统
  • Oracle发布MCP Server,自然语言交互说“人话”
  • AUTOSAR Mcal Gpt - 模块介绍
  • LeetCode|Day29|1009. 十进制整数的反码|Python刷题笔记
  • Jenkins 详解
  • Java 大视界 -- Java 大数据机器学习模型在金融信用评级模型优化与信用风险动态管理中的应用(371)