当前位置: 首页 > news >正文

kafka如何保证数据不丢失

下面我将使用 Python 代码示例,从生产者、集群和消费者三个层面详细讲解 Kafka 如何保证数据不丢失。我们将使用kafka-python库来实现相关功能。

一、生产者层面的数据不丢失保证

生产者通过配置确认机制、重试策略和幂等性来确保数据不丢失。

from kafka import KafkaProducer
from kafka.errors import KafkaError
import timedef create_safe_producer():# 配置生产者属性producer = KafkaProducer(bootstrap_servers=['localhost:9092'],# 确保所有ISR中的副本确认消息acks='all',# 失败重试次数retries=3,# 重试间隔(毫秒)retry_backoff_ms=1000,# 开启幂等性,防止重复发送enable_idempotence=True,# 限制未确认请求数量,保证顺序max_in_flight_requests_per_connection=1,# 序列化器value_serializer=lambda v: str(v).encode('utf-8'))return producerdef send_message_safely(producer, topic, message):try:# 发送消息并等待确认(同步发送)future = producer.send(topic, message)# 等待服务器响应record_metadata = future.get(timeout=10)print(f"消息发送成功 - 主题: {record_metadata.topic}, "f"分区: {record_metadata.partition}, "f"偏移量: {record_metadata.offset}")return Trueexcept KafkaError as e:print(f"消息发送失败: {str(e)}")# 这里可以添加自定义的重试逻辑或持久化失败的消息return Falseexcept Exception as e:print(f"发送过程中发生错误: {str(e)}")return Falseif __name__ == "__main__":producer = create_safe_producer()topic = "safe_topic"try:# 发送测试消息for i in range(5):message = f"这是第{i+1}条需要确保不丢失的消息"success = send_message_safely(producer, topic, message)if not success:print(f"消息 '{message}' 发送失败,已记录待后续处理")time.sleep(1)finally:# 确保所有缓冲消息都被发送producer.flush()producer.close()

关键配置说明:

  • acks='all':最安全的配置,消息需被所有同步副本确认
  • retries=3:发送失败时自动重试 3 次
  • enable_idempotence=True:开启幂等性,确保重试不会导致消息重复
  • 同步发送:通过future.get()等待结果,确保知道消息是否发送成功

二、集群层面的数据不丢失保证

Kafka 集群通过副本机制和 ISR(同步副本集)来保证数据不丢失。

1. 集群配置(server.properties)

# 每个broker的唯一标识
broker.id=0# 日志存储路径
log.dirs=/tmp/kafka-logs# 确保数据不丢失的关键配置
default.replication.factor=3  # 新主题默认副本数
min.insync.replicas=2         # 最小同步副本数,与生产者acks=all配合# 副本同步配置
replica.lag.time.max.ms=30000  # 副本同步滞后的最大时间# 禁止非ISR副本成为领导者,避免数据丢失
unclean.leader.election.enable=false# 日志保留策略
log.retention.hours=168  # 日志保留时间# Zookeeper连接
zookeeper.connect=localhost:2181

2. 创建高可用主题(Python 代码)

from kafka.admin import KafkaAdminClient, NewTopicdef create_safe_topic():# 连接到Kafka集群admin_client = KafkaAdminClient(bootstrap_servers="localhost:9092",client_id='topic_creator')# 定义主题配置,确保高可用topic_name = "safe_topic"num_partitions = 3  # 分区数replication_factor = 3  # 每个分区的副本数# 创建主题topic_list = [NewTopic(name=topic_name,num_partitions=num_partitions,replication_factor=replication_factor,# 额外配置configs={'min.insync.replicas': '2'  # 此主题的最小同步副本数})]try:# 创建主题admin_client.create_topics(new_topics=topic_list, validate_only=False)print(f"主题 '{topic_name}' 创建成功,分区数: {num_partitions}, 副本数: {replication_factor}")except Exception as e:print(f"创建主题失败: {str(e)}")finally:admin_client.close()if __name__ == "__main__":create_safe_topic()

关键配置说明:

  • default.replication.factor=3:每个分区默认有 3 个副本,分布在不同 broker 上
  • min.insync.replicas=2:与生产者 acks='all' 配合,确保至少 2 个副本确认接收消息
  • unclean.leader.election.enable=false:防止非同步副本成为领导者,避免数据丢失

三、消费者层面的数据不丢失保证

消费者通过手动提交偏移量和异常处理来确保数据不丢失。

from kafka import KafkaConsumer
from kafka.errors import KafkaError
import timedef create_safe_consumer(group_id):# 配置消费者属性consumer = KafkaConsumer('safe_topic',bootstrap_servers=['localhost:9092'],group_id=group_id,# 禁用自动提交偏移量enable_auto_commit=False,# 没有偏移量时从最早的消息开始消费auto_offset_reset='earliest',# 反序列化器value_deserializer=lambda m: m.decode('utf-8'),# 拉取超时时间consumer_timeout_ms=10000)return consumerdef process_message(message):"""处理消息的业务逻辑"""# 模拟处理时间time.sleep(0.5)print(f"处理消息: {message}")# 这里可以添加实际的业务逻辑# 如果处理失败,可以抛出异常# if some_condition:#     raise Exception("处理失败")return Truedef consume_messages_safely(consumer):try:while True:# 拉取消息messages = consumer.poll(timeout_ms=1000)if not messages:continueall_processed = True# 处理每个分区的消息for partition, records in messages.items():for record in records:try:# 处理消息success = process_message(record.value)if not success:all_processed = Falseprint(f"消息处理失败: {record.value}")except Exception as e:all_processed = Falseprint(f"处理消息时发生错误: {str(e)}, 消息: {record.value}")# 可以将失败的消息发送到死信队列# send_to_dead_letter_queue(record)# 只有所有消息都处理成功后才提交偏移量if all_processed:consumer.commit()print("偏移量已提交")else:print("部分消息处理失败,不提交偏移量")except KafkaError as e:print(f"消费过程中发生Kafka错误: {str(e)}")except Exception as e:print(f"消费过程中发生错误: {str(e)}")finally:consumer.close()if __name__ == "__main__":consumer = create_safe_consumer("safe_consumer_group")consume_messages_safely(consumer)

关键配置说明:

  • enable_auto_commit=False:禁用自动提交,由应用控制何时提交偏移量
  • auto_offset_reset='earliest':无偏移量时从最早消息开始消费
  • 手动提交:只有当所有消息处理成功后才调用consumer.commit()
  • 异常处理:捕获处理过程中的异常,确保失败时不提交偏移量

总结

Kafka 保证数据不丢失需要三个层面的协同工作:

  1. 生产者:通过acks='all'等待所有同步副本确认,设置重试机制,并使用同步发送确保消息成功投递
  2. 集群:通过多副本机制,设置合理的副本数和最小同步副本数,防止非同步副本成为领导者
  3. 消费者:通过手动提交偏移量,确保消息处理成功后再提交,并妥善处理异常情况

这三个层面的配置相互配合,才能构建一个可靠的 Kafka 系统,确保数据在各种异常情况下都不会丢失。

http://www.lryc.cn/news/599285.html

相关文章:

  • 2025年7月25日训练日志
  • Elasticsearch-8.17.0 centos7安装
  • Flink 自定义类加载器和子优先类加载策略
  • 第一章:Go语言基础入门之流程控制
  • k8s-MongoDB 副本集部署
  • 呼叫中心系统管理权限功能配置
  • gig-gitignore工具实战开发(四):使用ai辅助生成gitignore
  • 熵与交叉熵:从信息论到机器学习的「不确定性」密码
  • 有关于k8s中的CSI和CRI的有关知识
  • [硬件电路-85]:一款高集成度热电制冷器(TEC)控制器芯片ADN8835ACPZ
  • 【Docker项目实战】在Docker环境下部署go-file文件分享工具
  • SaaS型小程序自动化发布解决方案
  • 飞行控制领军者 | 边界智控携高安全级飞控系统亮相2025深圳eVTOL展
  • 大型微服务项目:听书——11 Redisson分布式布隆过滤器+Redisson分布式锁改造专辑详情接口
  • 巧用Proxy与异步编程:绕过浏览器安全限制实现文件选择器触发
  • 秋招Day19 - 分布式 - 分布式锁
  • 线性代数 下
  • 关于回归决策树CART生成算法中的最优化算法详解
  • 机器学习笔记(三)——决策树、随机森林
  • 水库大坝安全监测的主要内容
  • 在VSCode配置Java开发环境的保姆级教程(适配各类AI编程IDE)
  • 【内网穿透】使用FRP实现内网与公网Linux/Ubuntu服务器穿透项目部署多项目穿透方案
  • SSSD介绍
  • 【阅读整理】野火ADC_AD7192模块资料
  • “即时零售”风起,E3+企业中台如何赋能品牌企业破局增长?
  • CIU32L051 DMA+Lwrb环形队列实现串口无阻塞性数据的收发 + 数据百分百不丢失的实现
  • 百度蜘蛛池解析机制:原创
  • 通用CI/CD软件平台TeamCity v2025.3全新发布——主要界面交互体验升级
  • AWS CAF:企业云转型的战略指南
  • 佳能iR-ADV C5560复印机如何扫描文件到电脑