当前位置: 首页 > news >正文

kafka消息发送几种方式

同步发送 or 异步发送

       消息发送根据是否需要处理发送的结果分为同步发送、异步发送。

同步发送:等待发送结果返回,这种方式是可靠的,因为异常能及时处理,但同步发送需要阻塞等待一条消息发送完才处理下一条,吞吐量差。


异步发送:发送是异步的,不关心发送的结果,吞吐量最高,但可能存在发送失败的情况。

    本质上kafka 客户端提供的发送接口都是异步的,因为发送接口返回的是一个Future对象。对于同步发送通过future.get获取发送结果。异步发送则忽略send 返回值。

ListenableFuture<SendResult> future = kafkaTemplate.send(topic, content);try {SendResult sendResult = future.get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}

发送完成回调

有没有办法既要异步发送还要能处理发送失败的场景,这就是第三种,发送完成时,执行相应的回调方法。这是折中方案,兼顾效率且保证发送失败能被监控到。

producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e != null){System.out.println("send error ");
}else {System.out.println("send result  topic ="+recordMetadata.topic() + " partition=" + recordMetadata.partition() + "  offset=" + recordMetadata.offset() );
}}
});

发送异常

       有些发送异常可以通过重试几次后解决,比如网络异常,对于有些异常比如消息太大超出kafka配置的最大消息字节数,这类异常重试也会失败,所以这类异常KafkaProducer 不会进行任何重试。对于可重试异常可以配置重试次数

spring.kafka.producer.retries=10

SpringBoot 集成简单介绍

     参考上篇文章SpringBoot 集成配置(pom依赖、application配置),简单讲解SpringBoot 几个重要自动装配类。

KafkaAutoConfiguration

KafkaAutoConfiguration给我们自动配置了几个类

KafkaTemplate:可以通过KafkaTemplate进行发送消息,本质上内部还是使用的KafkaProducer发送消息的。

ProducerFactory:KafkaProducer工厂,通过createProducer()方法可以获取(KafkaProducer) 进行发送消息,避免直接new KafkaProducer

使用方式也很简单,由于直接KafkaAutoConfiguration已经定义了相关Bean, 使用时注入Bean即可

图片

@Autowired
private KafkaTemplate kafkaTemplate;@Autowired
private ProducerFactory producerFactory;

具体代码

同步发送、异步发送的方式直接使用 kafkaTemplate即可完成,同步发送结果处理:这里简单的打印出消息的topic partition offset 等信息如下图

ListenableFuture<SendResult> future = kafkaTemplate.send(topic, content);
SendResult sendResult = future.get();
RecordMetadata recordMetadata = sendResult.getRecordMetadata();
System.out.println("send result  topic ="+recordMetadata.topic() + " partition=" + recordMetadata.partition() + "  offset=" + recordMetadata.offset() );
 

                                   

图片

发送回调kafkaTemplate没有对应api , 需要通过Producer发送,我们通过producerFactory获取。

ProducerRecord record = new ProducerRecord(topic,content);Producer producer = producerFactory.createProducer();producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e != null){System.out.println("send error ");}else {System.out.println("send result  topic ="+recordMetadata.topic() + " partition=" + recordMetadata.partition() + "  offset=" + recordMetadata.offset() );}}});

http://www.lryc.cn/news/442059.html

相关文章:

  • K1计划100%收购 MariaDB; TDSQL成为腾讯云核心战略产品; Oracle@AWS/Google/Azure发布
  • Kyutai 开源对话模型 Moshi;李飞飞空间智能公司已筹集超过 2.3 亿美元丨 RTE 开发者日报
  • Go语言的io输入输出流
  • 表单里面input的type属性值有哪些?
  • 【Redis】之Geo
  • 常用的k8s容器网络模式有哪些?
  • 4位整数的数位和
  • XHTML学习
  • KTH7823——16 位高精度低延时霍尔磁编码器可编程 ABZ 和 PWM 输出模式角度传感器
  • JDBC笔记
  • 小众语言ruby在苹果中的初步应用
  • Nature: 一种基于宏基因组序列空间生成无参考的蛋白质家族的计算方法
  • play-with-docker使用指南
  • 常见中间件漏洞靶场(tomcat)
  • 一文读懂SpringCLoud
  • tcpdump使用方法
  • 密码字典txt python密码字典代码
  • ubuntu安装emqx
  • F28335 时钟及控制系统
  • 数据结构和算法之线性结构
  • 3. 轴指令(omron 机器自动化控制器)——>MC_MoveAbsolute
  • ai 回答HFS是什么 HTTP的文件服务器是什么
  • OpenHarmony(鸿蒙南向开发)——小型系统内核(LiteOS-A)【内核启动】
  • 若依笔记(六):前后端token鉴权体系
  • java框架
  • 利器 | 测试必会之 Linux 三剑客 ( grep / awk / sed )
  • 【Git原理与使用】多人协作与开发模型(2)
  • Linux vi常用命令
  • 天地伟业设备主动注册协议接入SVMSPro接入
  • C++日期类详解 第二级支线任务