当前位置: 首页 > news >正文

Kafka 如何优雅实现 Varint 和 ZigZag 编码

ByteUtils 是 Kafka 中一个非常基础且核心的工具类。从包名 common.utils 就可以看出,它被广泛用于 Kafka 的各个模块中。它的主要职责是提供一套高效、底层的静态方法,用于在字节缓冲区 (ByteBuffer)、字节数组 (byte[]) 以及输入/输出流 (InputStream/OutputStream) 中读写 Java 的基本数据类型。


ZigZag 编解码过程的数学原理详解

康托尔对角线映射。

可以找到一种方式,任何一个有理数都可以在有限位被枚举到

ZigZag 编码是一种巧妙的算法,它能将有符号整数(正数、负数、零)映射到无符号整数数轴上,其核心优势在于能将绝对值小的数(无论正负)都映射为小的无符号整数。这使得它与 Varint 编码结合使用时,能极大地压缩数据体积。

其编解码过程可以分为对非负数和负数两种情况进行讨论。


编码过程 (Signed -> Unsigned)

编码操作由公式 (n << 1) ^ (n >> 63)(以64位 long 为例)实现,我们可以将其拆解为两种情况:

对于非负数 (x >= 0):

  • ​编码公式​​: x -> 2*x
  • ​推导​​:
    当 x 为非负数时,x >> 63 的结果是 0。
    因此编码公式简化为 (x << 1) ^ 0,即 x * 2
  • ​示例​​:
    0 -> 0, 1 -> 2, 2 -> 4, ...

对于负数 (x < 0):

  • ​编码公式​​: x -> -2*x - 1
  • ​推导​​:
    当 x 为负数时,x >> 63 的结果是 -1(二进制全为1)。
    编码公式变为 (x << 1) ^ -1
    ^ -1 在二进制中等价于按位取反 (~)。
    因此,编码结果为 ~(x * 2)
    根据二进制补码的性质,~a = -a - 1,所以 ~(x * 2) 等于 - (x * 2) - 1,即 -2x - 1
  • ​示例​​:
    -1 -> 1, -2 -> 3, -3 -> 5, ...

​效果​​:
通过这种方式,正数被映射到偶数,负数被映射到奇数,实现了在无符号数轴上的“之”字形(ZigZag)交错排列。


解码过程 (Unsigned -> Signed)

解码操作由公式 (y >>> 1) ^ -(y & 1) 实现,其中 y 是编码后的无符号数。

(1) y >>> 1(无符号右移一位):

  • ​数学意义​​:
    等价于 y / 2(向下取整)。
  • ​对非负数编码结果 (y 为偶数)​​:
    y/2 直接得到原始值 x,解码完成。
  • ​对负数编码结果 (y 为奇数)​​:
    已知 y = -2x - 1,此时 y/2 = (-2x - 1) / 2 = -x - 1(向下取整)。

(2) -(y & 1)(判断奇偶并生成掩码):

  • ​作用​​:
    y & 1 用于判断 y 的奇偶性:
    • 若 y 为偶数,结果为 0;
    • 若 y 为奇数,结果为 1。
  • ​掩码生成​​:
    -(y & 1) 生成掩码:
    • y 为偶数时,掩码为 0;
    • y 为奇数时,掩码为 -1(二进制全为1)。

(3) ^(异或操作):

  • ​当 y 为偶数(来自非负数)​​:
    解码公式为 (y/2) ^ 0,结果即 y/2(原始值 x)。
  • ​当 y 为奇数(来自负数)​​:
    解码公式为 (y/2) ^ -1
    已知此时 y/2 = -x - 1,因此:
    (-x - 1) ^ -1
    ^ -1 等价于按位取反 (~),故结果为 ~(-x - 1)
    根据补码性质 ~a = -a - 1,推导如下:
    ~(-x - 1) = -(-x - 1) - 1 = (x + 1) - 1 = x
    最终还原为原始负数 x。

​总结​​:
通过这一系列精巧的位运算,解码过程成功将无符号数还原为原始有符号数。

可变长度整数(Varints)和长整数(Varlongs)

这是 ByteUtils 中非常重要的一部分,也是 Kafka 实现高效数据压缩的关键技术之一。Varint 是一种使用一个或多个字节序列化整数的方法,数值越小的整数(绝对值)占用的字节数越少。这对于存储大量小整数(如长度、数量等)的场景能有效节省空间。

Kafka 的 Varint 实现参考了 Google Protocol Buffers 的编码方案。

无符号 Varint (Unsigned Varint)

这是 Varint 的基础。它将一个 32 位整数编码为 1 到 5 个字节。每个字节的最高位(MSB)是标志位,1 表示后面还有字节,0 表示这是最后一个字节。剩下的 7 位用于存储数据。

  • 读取 (readUnsignedVarint):
    // ... existing code ...
    public static int readUnsignedVarint(ByteBuffer buffer) {byte tmp = buffer.get();if (tmp >= 0) {return tmp;} else {int result = tmp & 127;if ((tmp = buffer.get()) >= 0) {result |= tmp << 7;} else {result |= (tmp & 127) << 7;if ((tmp = buffer.get()) >= 0) {result |= tmp << 14;} else {result |= (tmp & 127) << 14;if ((tmp = buffer.get()) >= 0) {result |= tmp << 21;} else {result |= (tmp & 127) << 21;result |= (tmp = buffer.get()) << 28;if (tmp < 0) {throw illegalVarintException(result);}}}}return result;}
    }
    // ... existing code ...
    
    • 代码分析: 这段代码通过一系列的 if-else 结构展开了循环,这是一种为了性能的优化(循环展开)。
    • 它逐字节读取,检查最高位(通过 tmp >= 0 判断,如果为正数,说明最高位是0)。
    • 如果最高位是1,就取其低7位 (tmp & 127),并将其拼接到结果 result 的高位上,然后继续读取下一个字节。
    • 如果读取超过5个字节仍然没有结束,会抛出异常。

有符号 Varint (Signed Varint - ZigZag 编码)

直接用 Varint 编码负数效率很低(例如 -1 会被编码成 5 个字节)。为了高效地编码有符号数,特别是那些绝对值较小的负数,Kafka 使用了 ZigZag 编码。它通过一种位操作,将有符号数映射到无符号数上,使得绝对值小的数(无论正负)都映射为小的无符号数。

  • 映射规则:
    • (n << 1) ^ (n >> 31) for signed n

// ... existing code ...
public static int readVarint(ByteBuffer buffer) {int value = readUnsignedVarint(buffer);return (value >>> 1) ^ -(value & 1);
}
// ... existing code ...

这个方法的核心作用是解码一个经过 ZigZag(对角线)编码 和 Varint 编码 的整数。整个过程分为两步:

  1. 从 ByteBuffer 中读取一个经过 Varint 编码的无符号整数。
  2. 对这个无符号整数进行 ZigZag 解码,将其还原为原始的有符号整数。

1. int value = readUnsignedVarint(buffer);

这一步是 Varint 解码。它首先调用 readUnsignedVarint 方法,该方法会从字节流中读取 1 到 5 个字节,并将它们解析成一个32位的无符号整数。这个解析出来的 value 并不是最终结果,而是经过 ZigZag 编码后的中间值。

2. return (value >>> 1) ^ -(value & 1);

这是整个方法最关键的部分,即 ZigZag(对角线)解码。这一行代码非常精妙,它将上一步得到的无符号整数 value 还原回它所代表的原始有符号整数。

为什么需要 ZigZag 编码?

Varint 编码对于小的正整数效率很高(例如,0-127 只需要1个字节)。但对于负数,其二进制补码表示通常是一个很大的正数(例如,-1 的补码是 0xFFFFFFFF),如果直接用 Varint 编码,会占用最多的5个字节,完全失去了 Varint 的优势。

ZigZag 编码解决了这个问题。它通过一种位运算,将有符号整数“之”字形地映射到无符号整数上,从而保证绝对值小的数(无论正负)都会被映射成小的无符号数。

映射关系(对角线/ZigZag 编码)

原始有符号值 (Original Signed)编码后无符号值 (Encoded Unsigned)
00
-11
12
-23
24
......
2,147,483,6474,294,967,294
-2,147,483,6484,294,967,295

解码公式 (value >>> 1) ^ -(value & 1) 的剖析

让我们通过两个例子来理解这个解码过程:

  • 示例 1: 解码 -1

    1. 从映射表可知,-1 编码后的值为 1。所以 value = 1
    2. value & 1 => 1 & 1 => 1。 (取最低位,用于判断原始值的符号)
    3. -(value & 1) => -1。在二进制补码中,-1 是 ...11111111
    4. value >>> 1 => 1 >>> 1 => 0。 (无符号右移一位,获取数值部分)
    5. 0 ^ -1 => 000...000 ^ 111...111 => 111...111。结果是 -1解码正确
  • 示例 2: 解码 2

    1. 从映射表可知,2 编码后的值为 4。所以 value = 4 (二进制 ...00000100)。
    2. value & 1 => 4 & 1 => 0
    3. -(value & 1) => -0 => 0
    4. value >>> 1 => 4 >>> 1 => 2
    5. 2 ^ 0 => 2解码正确

readVarlong

readVarlong 和 writeVarlong 是 Varint 的 64 位版本,原理完全相同,只是最多可以占用 10 个字节,同样也使用了 ZigZag 编码来处理有符号长整型。

// ... existing code ...public static long readVarlong(ByteBuffer buffer)  {long raw =  readUnsignedVarlong(buffer);return (raw >>> 1) ^ -(raw & 1);}// visible for testingstatic long readUnsignedVarlong(ByteBuffer buffer)  {long value = 0L;int i = 0;long b;while (((b = buffer.get()) & 0x80) != 0) {value |= (b & 0x7f) << i;i += 7;if (i > 63)throw illegalVarlongException(value);}value |= b << i;return value;}
// ... existing code ...
  • 代码分析readUnsignedVarlong 使用了 while 循环,逻辑更清晰。它不断读取字节,只要字节的最高位是1 ((b & 0x80) != 0),就将其低7位拼接到结果中,并增加位移量 i。当读到最高位为0的字节时,循环结束。

writeVarlong

此方法的作用是将一个64位的有符号长整型 (long) 编码后写入到一个 DataOutput 输出流中。这个编码过程与我们之前讨论的 writeVarint 非常相似,同样是 ZigZag(对角线)编码 和 Varint 编码 的组合,只不过这次是针对64位的 long 类型。

// ... existing code .../*** Write the given integer following the variable-length zig-zag encoding from* <a href="http://code.google.com/apis/protocolbuffers/docs/encoding.html"> Google Protocol Buffers</a>* into the output.** @param value The value to write* @param out The output to write to*/public static void writeVarlong(long value, DataOutput out) throws IOException {long v = (value << 1) ^ (value >> 63);while ((v & 0xffffffffffffff80L) != 0L) {out.writeByte(((int) v & 0x7f) | 0x80);v >>>= 7;}out.writeByte((byte) v);}
// ... existing code ...

整个方法的执行可以分为两个主要步骤:

ZigZag(对角线)编码

long v = (value << 1) ^ (value >> 63);

这是编码的第一步,也是核心的 ZigZag 编码步骤。

  • value << 1: 将原始的 long 值向左移动一位。这个操作的目的是为符号位腾出空间。
  • value >> 63: 这是一个算术右移操作。对于 long 类型,算术右移63位会得到一个全为符号位的值。如果 value 是正数或0,结果是 0L;如果 value 是负数,结果是 -1L (二进制 0xFFFFFFFFFFFFFFFF)。
  • ^: 异或操作。
    • 如果 value 是正数或0(value << 1) ^ 0,结果就是 value 的两倍。
    • 如果 value 是负数(value << 1) ^ -1,结果是对 value 左移一位后的值进行按位取反。

这个公式巧妙地将有符号的 long 映射到了无符号的 long 数轴上,实现了我们之前讨论过的“对角线”映射,确保了绝对值小的数(无论正负)都会得到一个小的无符号编码值 v

Varint 编码

接下来的 while 循环负责将上一步得到的无符号编码值 v 进行 Varint 编码,并逐字节写入输出流。

while ((v & 0xffffffffffffff80L) != 0L) {out.writeByte(((int) v & 0x7f) | 0x80);v >>>= 7;
}
out.writeByte((byte) v);
  • while ((v & 0xffffffffffffff80L) != 0L): 这是循环的条件。0xffffffffffffff80L 是一个掩码,它的低7位是0,其余位都是1。这个条件检查 v 是否还有超过7位的数据。换句话说,只要 v 的值大于等于 128 (2^7),循环就会继续。

  • out.writeByte(((int) v & 0x7f) | 0x80);: 这是循环体内的核心操作。

    • (int) v & 0x7f: 取出 v 的低7位数据。
    • | 0x80: 将这7位数据与 0x80 (二进制 10000000) 进行或运算。这会将该字节的最高位(MSB)设置为 1,表示后面还有更多的字节。
    • out.writeByte(...): 将这个构造好的字节写入输出流。
  • v >>>= 7;: 将 v 无符号右移7位,准备处理下一组7位数据。

  • out.writeByte((byte) v);: 当循环结束时,意味着 v 的剩余值已经小于128,可以用7位来表示。这时,将 v 的最后部分直接作为一个字节写入。这个字节的最高位自然是 0,表示这是 Varint 序列的最后一个字节。

总结

writeVarlong 方法通过一个两步过程高效地序列化一个 long 值:

  1. ZigZag 编码: 使用 (value << 1) ^ (value >> 63) 将有符号 long 映射为无符号 long,使得小数值(无论正负)编码后依然是小数值。
  2. Varint 编码: 使用 while 循环,每次从编码后的值中取出7位数据,并加上一个“续传”标志位(MSB=1),然后写入字节流,直到最后不足7位的数据作为最后一个字节(MSB=0)写入。

这种组合编码方式是 Kafka 协议中节省空间、提升效率的关键技术之一,尤其在传输大量包含小整数(如时间戳增量、偏移量增量等)的消息时效果显著。

无符号整数(Unsigned Integers)的处理

Java 的基本数据类型中没有无符号整数(unsigned int)。但在网络协议或与其他系统交互时,经常需要处理无符号数。ByteUtils 提供了方法来模拟对 32 位无符号整数的读写。

读取无符号整数

为了避免将一个最高位为1的32位整数错误地解释为负数,ByteUtils 在读取后将其转换为 long 类型。

// ... existing code .../*** Read an unsigned integer from the current position in the buffer, incrementing the position by 4 bytes** @param buffer The buffer to read from* @return The integer read, as a long to avoid signedness*/public static long readUnsignedInt(ByteBuffer buffer) {return buffer.getInt() & 0xffffffffL;}
// ... existing code ...
  • 代码分析buffer.getInt() 读取一个标准的 32 位有符号整数。关键在于 & 0xffffffffL 这个操作。它是一个按位与操作,通过一个 long 类型的掩码,将读取到的 int 值(可能会被当作负数)转换为一个正的 long 值,从而正确地表示了原始的无符号整数值。

写入无符号整数

写入时,逻辑类似,将一个 long 值截断为 32 位 int 再写入。

// ... existing code .../*** Write the given long value as a 4 byte unsigned integer. Overflow is ignored.** @param buffer The buffer to write to* @param value The value to write*/public static void writeUnsignedInt(ByteBuffer buffer, long value) {buffer.putInt((int) (value & 0xffffffffL));}
// ... existing code ...
  • 代码分析(value & 0xffffffffL) 确保只取 long 值的低 32 位,然后强制转换为 int 并写入 ByteBuffer

此外,该类还提供了处理小端字节序(Little-Endian)的方法,如 readUnsignedIntLE 和 writeUnsignedIntLE,这在需要与采用不同字节序的系统交互时非常有用。Kafka 的网络协议本身是网络字节序,即大端(Big-Endian)。

其他工具方法

除了上述核心功能,ByteUtils 还包含一些其他有用的方法,例如:

  • readDouble/writeDouble: 读写 64 位浮点数。
  • EMPTY_BUF: 提供一个静态的、空的 ByteBuffer 实例,避免重复创建。

总结

ByteUtils 是 Kafka 中一个至关重要的底层工具类,它封装了对 Java 基本类型与字节之间进行高效转换的逻辑。它的设计体现了对性能的极致追求,例如在 readUnsignedVarint 中使用循环展开,以及提供 Varint/Varlong 这种空间高效的编码方式。理解这个类的工作原理,特别是 Varint 和 ZigZag 编码,对于深入理解 Kafka 的网络协议、消息格式以及存储机制非常有帮助。

http://www.lryc.cn/news/595482.html

相关文章:

  • 【每天一个知识点】非参聚类(Nonparametric Clustering)
  • 期权到期会对大盘有什么影响?
  • 如何用 Z.ai 生成PPT,一句话生成整套演示文档
  • 【操作篇】群晖NAS用root权限直接访问系统分区文件
  • 圆柱电池自动分选机:全流程自动化检测的革新之路
  • 83、形式化方法
  • 淘宝获取商品分类接口操作指南
  • MySQL介绍和MySQL包安装
  • accelerate 在Pycham中执行的设置方法
  • 泛型:C#中的类型抽象艺术
  • Telnet远程登录配置全流程详解
  • 大模型为什么出现幻觉?
  • 二分查找:区间内查询数字的频率
  • 【python数据结构算法篇】python数据结构
  • Linux——C/C++静态库与动态库完全指南:从制作到实战应用
  • 安全测试学习
  • 产品剖析之AI创作与协作的未来革新者Flowith
  • nerf-2020
  • pandas 的series和dataframe的用法,六个题目
  • 牛客网题解 | 单词识别
  • Playwright-MCP浏览器会话复用全解析
  • 腾讯客户端开发面试真题分析
  • Mac上安装Homebrew的详细步骤
  • 语义化版本规范(SemVer)
  • 北京-4年功能测试2年空窗-报培训班学测开-第五十六天
  • CS课程项目设计4:支持AI人机对战的五子棋游戏
  • Java学习-----AIO模型
  • 2025杭电多校赛(2)1006 半
  • 对称加密技术详解:原理、算法与实际应用
  • 代码随想录算法训练营二十二天|回溯part04