当前位置: 首页 > news >正文

kafka入门(四):kafka生产者发送消息

创建生产者实例和构建消息之后,就可以开始发送消息了。

发送消息主要有三种模式:发后即忘、同步、异步。

发后即忘:

就是直接调用 生产者的 send方法发送。

发后即完,只管往 kafka中发送消息,而不关心消息是否正确到达。

这种发送方式的性能最高,可靠性也最差。

producer.send(record);

具体代码如下:

public class KafkaDemoProducer {public static final String BROKER_LIST = "localhost:9092";public static final String TOPIC = "myTopic1";public static void main(String[] args) {//属性配置Properties properties = getProperties(BROKER_LIST);//生产者初始化KafkaProducer<String, String> producer = new KafkaProducer<>(properties);ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "hello kafka");//发送消息try {producer.send(record);System.out.println("========>producer.send(record).");} catch (Exception e) {System.out.println("send error." + e);}producer.close();}private static Properties getProperties(String brokerList) {Properties properties = new Properties();properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");properties.put("bootstrap.servers", brokerList);return properties;}}

同步发送:

try {producer.send(record).get();
} catch (ExecutionException | InterruptedException e) {log.error("send record get error", e);
}

同步发送的方式可靠性最高,要么消息发送成功,要么发生异常。如果发生异常,会catch并处理异常。

同步发送的性能会差一些,需要阻塞等待一条消息发送完,才能发送下一条。

异步发送:

异步发送,就是在 send 方法里指定一下 Callback 的回调函数。

消息发送成功后,会收到成功的回调。参数 metadata ,为发送成功的消息,相关的信息

如果发送失败,也会收到回调,包含失败的异常信息 exception。

producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception != null) {log.error("send onCompletion error." , exception);} else {log.info(metadata.topic() + "-" + metadata.partition() + ":" + metadata.offset());}}
});

参考资料:

《深入理解Kafka 核心设计与实践原理》

http://www.lryc.cn/news/262445.html

相关文章:

  • redis集群模糊获取缓存redisKey
  • 100GPTS计划-AI翻译TransLingoPro
  • Linux install manual 1Panel
  • 母婴服务品牌网站的效果如何
  • C语言--有一个3*4的矩阵,求出其中最大值的那个元素的值,以及其所在的行号和列号
  • 安全算法(二):共享密钥加密、公开密钥加密、混合加密和迪菲-赫尔曼密钥交换
  • MYSQL练题笔记-高级字符串函数 / 正则表达式 / 子句-简单3题
  • vue扭蛋机抽奖游戏
  • 代码随想录27期|Python|Day16|二叉树|104.二叉树的最大深度|111.二叉树的最小深度|222.完全二叉树的节点个数
  • ༺༽༾ཊ—设计-简介-模式—ཏ༿༼༻
  • Matplotlib快速入门,Python通用的绘图工具库上手
  • Linux 基本语句_16_Udp网络聊天室
  • 使用ffmpeg命令进行视频格式转换
  • Mac安装Adobe AE/pr/LR/ai/ps/au/dw/id 2024/2023报错问题解决(常见错误:已损坏/2700/146/130/127)
  • Python三级 每周练习题31
  • 【DataSophon】大数据服务组件之Flink升级
  • Android笔记(十八):面向Compose组件结合Retrofit2和Rxjava3实现网络访问
  • mybatis中oracle的sql没走索引导致特别慢(未加jdbcType的)
  • QT自带打包问题:无法定位程序输入点?metaobject@qsound
  • 7.3 lambda函数
  • dcoker-compose一键部署EFAK —— 筑梦之路
  • 音视频:Ubuntu下安装 FFmpeg 5.0.X
  • 【LSM tree 】Log-structured merge-tree 一种分层、有序、面向磁盘的数据结构
  • 配置OSPF与BFD联动示例
  • 01到底应该怎么理解“平均负载”
  • jmeter,动态参数之随机数、随机日期
  • uniApp常见知识点-问题答案
  • 云原生基础入门概念
  • 一个 tomcat 下如何部署多个项目?附详细步骤
  • pycharm强制让terminal停止执行的快捷键