当前位置: 首页 > news >正文

kafka学习-生产者

目录

1、消息生产流程

2、生产者常见参数配置

3、序列化器

基本概念

自定义序列化器

4、分区器

默认分区规则

自定义分区器

5、生产者拦截器

作用

自定义拦截器

6、生产者原理解析


1、消息生产流程

2、生产者常见参数配置

3、序列化器

基本概念

  • 在Kafka中保存的数据都是字节数组。
  • 消息发送前,需要将消息序列化为字节数组进行发送。
  • 生产者通过key.serializer和value.serializer指定key和value的序列化器。
  • Kafka使用org.apache.kafka.common.serialization.Serializer接口定义序列化器。
  • Kafka已实现的序列化器有:ByteArraySerializer、ByteBufferSerializer、BytesSerializer、DoubleSerializer、FloatSerializer、IntegerSerializer、StringSerializer、LongSerializer、ShortSerializer。

自定义序列化器

实现org.apache.kafka.common.serialization.Serializer<T>接口,并实现其中的serializer方法。

@Data
public class User {private Integer userId;private String username;
}public class UserSerializer implements Serializer<User> {@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {// do nothing}@Overridepublic byte[] serialize(String topic, User data) {try {// 如果数据是null,则返回nullif (data == null) return null;Integer userId = data.getUserId();String username = data.getUsername();int length = 0;byte[] bytes = null;if (null != username) {bytes = username.getBytes("utf-8");length = bytes.length;}// 第一个4字节存储userId的值// 第二个4字节存储username字节数组的长度int值// 第三个length长度,存储username序列化之后的字节数组ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + length);buffer.putInt(userId);buffer.putInt(length);buffer.put(bytes);return buffer.array();} catch (UnsupportedEncodingException e) {throw new SerializationException("序列化数据异常");}}@Overridepublic void close() {// do nothing}
}

4、分区器

默认分区规则

KafkaProducer.partition();DefaultPartitioner.partition();

  1. 如果record提供了分区号,则使⽤record提供的分区号
  2. 如果record没有提供分区号,则使⽤key的序列化后的值的hash值对分区数量取模
  3. 如果record没有提供分区号,也没有提供key,则使⽤轮询的⽅式分配分区号。

自定义分区器

实现org.apache.kafka.clients.producer.Partitioner接口,并实现其中的partition方法。

在生产者参数中通过配置partitioner.class指定自定义分区器。

/*** 自定义分区器*/
public class MyPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 此处可以计算分区的数字。// 我们直接指定为2return 2;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

5、生产者拦截器

作用

        在发送消息前,或者在执行回调逻辑前,对消息做一些定制化的处理,比如修改消息,打印消息日志等。此外,Producer允许设置多个拦截器从而形成一条拦截器链,Producer将按照指定顺序调用它们。

自定义拦截器

        自定义拦截器实现org.apache.kafka.clients.producer.ProducerInterceptor接口,并实现其中的onSend()、onAcknowledgement()、close()接口。其中:

  • onSend(ProducerRecord):Producer 确保在消息被序列化前调⽤该⽅法。⽤户可以在该⽅法中对消息做任何操作,但最好保证不要修 改消息所属的topic和分区,否则会影响⽬标分区的计算。
  • onAcknowledgement(RecordMetadata, Exception):该⽅法会在消息被应答之前或消息发送失败时调⽤, 并且通常都是在Producer回调逻辑触发之前。
  • close:关闭Interceptor,主要⽤于执⾏⼀些资源清理⼯作。

        在生产者参数中通过配置ProducerConfig.INTERCEPTOR_CLASSES_CONFIG指定自定义拦截器。

public class Interceptor<KEY, VALUE> implements ProducerInterceptor<KEY, VALUE> {private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorTwo.class);@Overridepublic ProducerRecord<KEY, VALUE> onSend(ProducerRecord<KEY, VALUE> record) {System.out.println("拦截器---go");// 此处根据业务需要对相关的数据作修改String topic = record.topic();Integer partition = record.partition();Long timestamp = record.timestamp();KEY key = record.key();VALUE value = record.value();Headers headers = record.headers();// 添加消息头headers.add("interceptor", "interceptor".getBytes());ProducerRecord<KEY, VALUE> newRecord = new ProducerRecord<KEY, VALUE>(topic, partition, timestamp, key, value, headers);return newRecord;}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {System.out.println("拦截器---back");if (exception != null) {// 如果发生异常,记录在日志中LOGGER.error(exception.getMessage());}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

6、生产者原理解析

以上内容为个人学习理解,如有问题,欢迎在评论区指出。

部分内容截取自网络,如有侵权,联系作者删除。

http://www.lryc.cn/news/160998.html

相关文章:

  • 【Python】设计模式
  • C++ 数字
  • code阶段——gitgitlab安装
  • C 风格文件输入/输出---无格式输入/输出
  • Spring-MVC的文件上传下载,及插件的使用(让项目开发更节省时间)
  • 算法 数据结构 递归冒泡算法 java冒泡算法 优化递归冒泡 数据结构(九)
  • 【计算机视觉 | 目标检测】目标检测常用数据集及其介绍(十五)
  • 洛谷P8814:解密 ← CSP-J 2022 复赛第2题
  • Flutter实现CombineExecutor进行多个异步分组监听,监听第一个异步执行的开始和最后一个异步执行结束时机。
  • 2023 年最新Java 毕业设计选题题目参考,500道 Java 毕业设计题目,值得收藏
  • Mac电脑其他文件占用超过一大半的内存如何清理?
  • geopandas 笔记: datasets 数据集
  • 长胜证券:三大拐点共振 看好智能驾驶新一轮行情
  • AIGC专栏5——EasyPhoto AI写真照片生成器 sd-webui插件介绍、安装与使用
  • 【Python程序设计】 工厂模式【07/8】
  • PHP8的多维数组-PHP8知识详解
  • 【【STM32--28--IO引脚的复用功能】】
  • CodeJock Active-X / COM v22.1.0 Crack
  • mac通过docker搭建elasticsearch:8.9.2以及kibana:8.9.2
  • python实现排列组合代码
  • 盲盒小程序开发方案
  • Mysql锁
  • Kubernetes(k8s)安装NFS动态供给存储类并安装KubeSphere
  • 机器学习笔记 - 【机器学习案例】基于KerasCV的预训练模型自定义多头+多标签预测
  • Linux Debian常用70条经典运维命令和使用案例
  • 【涵子来信】——步入中学,日积跬步,以致千里
  • 【sgCreateAPI】自定义小工具:敏捷开发→自动化生成API接口脚本(接口代码生成工具)
  • 数据库相关基础知识
  • LeetCode刷题笔记【23】:贪心算法专题-1(分发饼干、摆动序列、最大子序和)
  • C++算法 —— 分治(2)归并