当前位置: 首页 > news >正文

消息中间件篇之Kafka-消息不丢失

一、 正常工作流程

        生产者发送消息到kafka集群,然后由集群发送到消费者。

        但是可能中途会出现消息的丢失。下面是解决方案。

二、 生产者发送消息到Brocker丢失

1. 设置异步发送

    //同步发送RecordMetadata recordMetadata = kafkaProducer.send(record).get();//异步发送kafkaProducer.send(record,new Callback() {@Override public void onCompletion (RecordMetadata recordMetadata, Exception e){if (e != null) {System.out.println("消息发送失败 | 记录日志");}long offset = recordMetadata.offset();int partition = recordMetadata.partition();String topic = recordMetadata.topic();}});

2.消息重试

//设置重试次数
prop.put(ProducerConfig.RETRIES_CONFIG,10);

三、消息在Brocker中存储丢失

        发送确认机制acks。消息首先Topic是key,到达Topic以后才选择分区Partition(默认就一个分区,0号分区),默认连接的就是分区的Leader节点,由leader分区同步到follower区中。

四、消费者从Brocker接收消息丢失

1.分区机制

        1. Kafka 中的分区机制指的是将每个主题划分成多个分区(Partition)。

        2. topic分区中消息只能由消费者组中的唯一一个消费者处理,不同的分区分配给不同的消费者(同一个消费者组)。

2.消费方式

        消费者默认是自动按期提交已经消费的偏移量,默认是每隔5s提交一次,如果出现重平衡的情况,可能会重复消费或丢失数据

3.那如何解决重复消费

        禁用自动提交偏移量,改为手动: 1. 同步提交。  2. 异步提交。 3. 同步+异步组合提交。

       

五、面试题

面试官:Kafka是如何保证消息不丢失?

候选人:嗯,这个保证机制很多,在发送消息到消费者接收消息,在每个阶段都有可能会丢失消息,所以我们解决的话也是从多个方面考虑:

第一个是生产者发送消息的时候,可以使用异步回调发送,如果消息发送失败,我们可以通过回调获取失败后的消息信息,可以考虑重试或记录日志,后边再做补偿都是可以的。同时在生产者这边还可以设置消息重试,有的时候是由于网络抖动的原因导致发送不成功,就可以使用重试机制来解决。

第二个在broker中消息有可能会丢失,我们可以通过kafka的复制机制来确保消息不丢失,在生产者发送消息的时候,可以设置一个acks,就是确认机制。我们可以设置参数为all,这样的话,当生产者发送消息到了分区之后,不仅仅只在leader分区保存确认,在follwer分区也会保存确认,只有当所有的副本都保存确认以后才算是成功发送了消息,所以,这样设置就很大程度了保证了消息不会在broker丢失。

第三个有可能是在消费者端丢失消息,kafka消费消息都是按照offset进行标记消费的,消费者默认是自动按期提交已经消费的偏移量,默认是每隔5s提交一次,如果出现重平衡的情况,可能会重复消费或丢失数据。我们一般都会禁用掉自动提价偏移量,改为手动提交,当消费成功以后再报告给broker消费的位置,这样就可以避免消息丢失和重复消费了。

面试官:Kafka中消息的重复消费问题如何解决的?

候选人:kafka消费消息都是按照offset进行标记消费的,消费者默认是自动按期提交已经消费的偏移量,默认是每隔5s提交一次,如果出现重平衡的情况,可能会重复消费或丢失数据。我们一般都会禁用掉自动提价偏移量,改为手动提交,当消费成功以后再报告给broker消费的位置,这样就可以避免消息丢失和重复消费了。

为了消息的幂等,我们也可以设置唯一主键来进行区分,或者是加锁,数据库的锁,或者是redis分布式锁,都能解决幂等的问题。

http://www.lryc.cn/news/307419.html

相关文章:

  • Rust使用calamine读取excel文件,Rust使用rust_xlsxwriter写入excel文件
  • 中文文本分类(pytorch 实现)
  • 【每日前端面经】2023-02-27
  • springboot + easyRules 搭建规则引擎服务
  • Mac电脑配置环境变量
  • Windows系统x86机器安装(麒麟、统信)ARM系统详细教程
  • 消息中间件篇之RabbitMQ-高可用机制
  • express+mysql+vue,从零搭建一个商城管理系统5--用户注册
  • canvas水波纹效果,jquery鼠标水波纹插件
  • Zookeeper客户端命令、JAVA API、监听原理、写数据原理以及案例
  • [嵌入式系统-34]:RT-Thread -19- 新手指南:RT-Thread标准版系统架构
  • postman访问k8s api
  • UE4c++ ConvertActorsToStaticMesh
  • Qt中tableView控件的使用
  • 【医学影像】LIDC-IDRI数据集的无痛制作
  • MacOS开发环境搭建详解
  • 全量知识系统问题及SmartChat给出的答复 之2
  • 嵌入式驱动学习第一周——vim的使用
  • loop_list单向循环列表
  • Python爬虫实战第二例【二】
  • Eclipse是如何创建web project项目的?
  • Excel的中高级用法
  • 【ArcGIS】基本概念-空间参考与变换
  • Qt QWidget 简约美观的加载动画 第五季 - 小方块风格
  • 针对KZG承诺和高效laconic OT的extractable witness encryption
  • Spring Boot中实现列表数据导出为Excel文件
  • 华为ipv6 over ipv4 GRE隧道配置
  • 项目解决方案:海外门店视频汇聚方案(全球性的连锁店、国外连锁店视频接入和汇聚方案)
  • Java中的数据类型详解
  • ABBYY FineReader16文档转换、PDF管理与文档比较功能介绍