当前位置: 首页 > news >正文

kafka生产者2

1.数据可靠

• 0:生产者发送过来的数据,不需要等数据落盘应答。 

风险:leader挂了之后,follower还没有收到消息。。。。

• 1:生产者发送过来的数据,Leader收到数据后应答。

风险:leader应答完成之后,还没有开始同步副本。。。。

 

 生产者发送过来的数据,Leader和ISR队列里面 的所有节点收齐数据后应答。

可靠性总结: acks=0,生产者发送过来数据就不管了,可靠性差,效率高; acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等; acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follwer应答,可靠性高,效率低; 在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据, 对可靠性要求比较高的场景。

// 设置 acksproperties.put(ProducerConfig.ACKS_CONFIG, "all");// 重试次数 retries,默认是 int 最大值,2147483647properties.put(ProducerConfig.RETRIES_CONFIG, 3);

2.数据去重 

 

3.生产者事务

 说明:开启事务,必须开启幂等性。

package com.atguigu.kafka.producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class CustomProducerTransaction {public static void main(String[] args) {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"tranactional_id_01");// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = newKafkaProducer<String, String>(properties);kafkaProducer.initTransactions();kafkaProducer.beginTransaction();// 4. 调用 send 方法,发送消息try{for (int i = 0; i < 5; i++) {kafkaProducer.send(newProducerRecord<>("first","atguigu " + i));int i1 = 1 / 0;}kafkaProducer.commitTransaction();}catch (Exception e){kafkaProducer.abortTransaction();}finally {// 5. 关闭资源kafkaProducer.close();}}
}

4.数据乱序

 

http://www.lryc.cn/news/306177.html

相关文章:

  • 【LNMP】云导航项目部署及环境搭建(复杂)
  • nginx之状态页 日志分割 自定义图表 证书
  • 数字人的未来:数字人对话系统 Linly-Talker + 克隆语音 GPT-SoVITS
  • SpringMVC 学习(五)之域对象
  • ✅技术社区项目—JWT身份验证
  • 5.2 Ajax 数据爬取实战
  • 276.【华为OD机试真题】矩阵匹配(二分法—JavaPythonC++JS实现)
  • java——多线程基础
  • Python服务器监测测试策略与工具:确保应用的高可用性!
  • Spring Security源码学习
  • 大数据面试总结三
  • AI赚钱套路总结和教程
  • Linux安装jdk、tomcat、MySQL离线安装与启动
  • Python爬虫-使用代理伪装IP
  • Typora结合PicGo + 使用Github搭建个人免费图床
  • 【Redis】redis简介与安装
  • 【xss跨站漏洞】xss漏洞利用工具beef的安装
  • 编程笔记 html5cssjs 086 JavaScript 内置对象
  • AttributeError: ‘DataFrame‘ object has no attribute ‘set_value‘怎么修改问题的解决
  • Jmeter内置变量 vars 和props的使用详解
  • c#高级-正则表达式
  • 说说UE5中的几种字符串类
  • (done) 如何判断一个矩阵是否可逆?
  • 洗眼镜用的超声波清洗机哪一家更好一点?好用超声波清洗机排名
  • (二十二)Flask之上下文管理第三篇【收尾—讲一讲g】
  • 五种多目标优化算法(MOGWO、MOJS、NSWOA、MOPSO、MOAHA)性能对比,包含6种评价指标,9个测试函数(提供MATLAB代码)
  • istio实战:springboot项目在istio中服务调用
  • 随机分布模型
  • Visual Studio:Entity设置表之间的关联关系
  • 每日五道java面试题之spring篇(二)