当前位置: 首页 > news >正文

Kafka生产者消息异步发送并返回发送信息api编写教程

1.引入依赖(pox.xml文件)

<dependencies>

        <dependency>

            <groupId>org.apache.kafka</groupId>

            <artifactId>kafka-clients</artifactId>

            <version>3.6.2</version>

        </dependency>

</dependencies>

2.创建java类

3.配置运行属性

//连接的服务器

properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092");

//指定对应的key和value的序列化类型

properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

//关联自定义分区器

//properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.ljr.kafka.producer.MyPartitioner");

4.创建生产者对象

键入new KafkaProducer<>(),光标置于括号内CTRL+P可以显示需要对象为properties;

键入new Properties().var 回车,键入new KafkaProducer<>(properties).var 回车,选择变量名

5.发送消息并返回发送结果

键入KafkaProducer.send(),提示需要对象ProducerRecord;键入topic名(order)和要发送的信息(“0000”+i),new Callback()回车会弹出需要重写的抽象类,补全返回条件、需要返回的信息即可实现抽象类;

e == null 表示消息全部发送完毕;

6.关闭资源

KafkaProducer.close();

7.运行查看结果

运行:

可以看到有返回信息;

另开窗口查看发送结果

kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092 --topic order

信息发送成功;

8.完整代码

package com.ljr.kafka.producer;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class CustomProducerCallback {public static void main(String[] args) {Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
/关联自定义分区器
//		properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.ljr.kafka.producer.MyPartitioner");KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);for(int i =0; i < 3; i++){kafkaProducer.send(new ProducerRecord<>("customers", "LiSi" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e == null) {System.out.println("topic:" + recordMetadata.topic() + " partition:" + recordMetadata.partition());}}});}kafkaProducer.close();}
}

http://www.lryc.cn/news/361643.html

相关文章:

  • WiFi串口服务器与工业路由器:局域网应用的协同之力
  • Unity功能——通过按键设置物体朝左/右旋转(含C#转xlua版)
  • 泛微ecology开发修炼之旅
  • PostgreSQL的视图pg_locks
  • 元宇宙NFG结合IPO线上营销模型合理降税
  • Python打印当前目录下,所有文件名的首字母
  • 程序员应该有什么职业素养?
  • 【PostgreSQL17新特性之-冗余IS [NOT] NULL限定符的处理优化】
  • Flink的简单学习二
  • 如何提高员工的工作主动性?
  • FFmpeg PCM编码为AAC
  • React@16.x(16)Render Props
  • STM32 定时器问题
  • CSS学习笔记目录
  • 随笔-我在武汉一周了
  • Python 爬虫零基础:探索网络数据的神秘世界
  • 微信小程序的view的属性值和用法
  • Python优化、异常处理与性能提升技巧
  • Flink状态State | 大数据技术
  • go语言方法之方法值和方法表达式
  • TDMQ CKafka 版弹性存储能力重磅上线!
  • 24、Linux网络端口
  • Mysql全文搜索和LIKE搜索有什么区别
  • elementplu父级页面怎么使用封装子组件原组件的方法
  • el-date-picker选择开始日期的近半年
  • C++
  • nginx源码阅读理解 [持续更新,建议关注]
  • 笔试训练2
  • 构建坚不可摧的Web安全防线:深入剖析二阶注入与全面防御策略
  • (4) qml动态元素