当前位置: 首页 > news >正文

Kafka Producer发送消息流程之消息异步发送和同步发送

文章目录

  • 1. 异步发送
  • 2. 同步发送

在这里插入图片描述

1. 异步发送

Kafka默认就是异步发送,在Main线程中的多条消息,没有严格的先后顺序,Sender发送后就继续下一条,异步接受结果。

public class KafkaProducerCallbackTest {public static void main(String[] args) throws InterruptedException {//创建producerHashMap<String, Object> config = new HashMap<>();config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);for (int i = 0; i < 10; i++) {//创建recordProducerRecord<String, String> record = new ProducerRecord<String, String>("test2",""+i,"我是你爹"+i);//发送recordproducer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {System.out.println("回调信息:消息发送成功");}});System.out.println("发送数据");}//关闭producerproducer.close();}
}

Main线程中,对于多条数据,下一条消息的发送并不等待上一条消息的确认,而是继续发送。

2024-07-17 21:43:46.052 [kafka-producer-network-thread | producer-1] INFO  org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: BqIgDGtwTeeusL_ygHtn2w
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
2024-07-17 21:43:46.075 [main] INFO  org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2024-07-17 21:43:46.280 [kafka-producer-network-thread | producer-1] INFO  o.a.k.c.producer.internals.TransactionManager - [Producer clientId=producer-1] ProducerId set to 6000 with epoch 0
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
2024-07-17 21:43:46.569 [main] INFO  org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed

可以看到先是main线程循环发送完了多条数据,然后再异步收到通知。

2. 同步发送

消息有严格的先后顺序,下一条消息必须等到上一条消息的回调确认后,再发送,这是一个效率极低的过程。

按照流程图,上一条消息需要从生产者一直流转,多个步骤,到数据收集器,到Sender,最后还要等待回调确认,才可以开始下一条消息的流转。

public class KafkaProducerCallbackTest {public static void main(String[] args) throws InterruptedException, ExecutionException {//创建producerHashMap<String, Object> config = new HashMap<>();config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);for (int i = 0; i < 10; i++) {//创建recordProducerRecord<String, String> record = new ProducerRecord<String, String>("test2",""+i,"我是你爹"+i);//发送recordFuture<RecordMetadata> send = producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {System.out.println("回调信息:消息发送成功");}});System.out.println("发送数据");send.get();}//关闭producerproducer.close();}
}
2024-07-17 21:49:19.586 [kafka-producer-network-thread | producer-1] INFO  o.a.k.c.producer.internals.TransactionManager - [Producer clientId=producer-1] ProducerId set to 5000 with epoch 0
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
2024-07-17 21:49:19.823 [main] INFO  org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2024-07-17 21:49:19.838 [main] INFO  org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed
http://www.lryc.cn/news/402062.html

相关文章:

  • Flutter 状态管理调研总结
  • 入门C语言只需一个星期(星期二)
  • 切换node版本
  • 【常见开源库的二次开发】基于openssl的加密与解密——Base的编解码(二进制转ascll)(二)
  • ssrf复习(及ctfshow351-360)
  • 请求通过Spring Cloud Gateway 503
  • C++代码_让室友坑我
  • AG32 的MCU与FPGA的主频可以达到568MHz吗
  • 怎样减少视频的容量 怎样减少视频内存保持清晰度
  • 谈一谈一条SQL的查询、更新语句究竟是如何执行的?
  • 自动驾驶AVM环视算法–全景和标定全功能算法实现和exe测试demo
  • 【Docker 系列】学习路线
  • 蓝色系信息工作室建站网站源码系统 带模版手机端 带完整的源代码包以及搭建部署教程
  • 什么是带宽限制,如何影响服务器数据传输?
  • RISC-V在线反汇编工具
  • 从零手写实现 nginx-32-load balance 负载均衡算法 java 实现
  • 基于STC89C51单片机的烟雾报警器设计(煤气火灾检测报警)(含文档、源码与proteus仿真,以及系统详细介绍)
  • SpringBoot整合阿里云RocketMQ对接,商业版
  • modbus slave 设备通过 网关thingsboard-gateway 将数据上传到thingsboard云平台
  • 安全防御:智能选路
  • Gitee使用教程2-克隆仓库(下载项目)并推送更新项目
  • Postfix+Dovecot+Roundcube开源邮件系统搭建系列1-2:系统搭建目标+MariaDB数据库配置(MySQL)
  • Flower花所比特币交易及交易费用科普
  • 1个Xpath定位可以在Web页面查找到多个元素Selenium
  • 智慧博物馆的“眼睛”:视频智能监控技术守护文物安全与智能化管理
  • vue中:class、watch、v-show使用
  • 中电金信-杭州工商银行|面试真题|2024年
  • 搞定前端面试题——ES6同步与异步机制、async/await的使用以及Promise的使用!!!
  • Redis数据结构--跳跃表 Skip List
  • 线状激光模组定制厂家哪家好?具体怎么收费?