当前位置: 首页 > news >正文

Debezium SchemaNameAdjuster 分析

Debezium SchemaNameAdjuster 分析

目录

  • 1. 概述
  • 2. 核心功能
  • 3. 实现原理
  • 4. 应用场景
  • 5. 扩展示例
  • 6. 总结

1. 概述

SchemaNameAdjuster 是 Debezium 中的一个工具类,主要用于确保 Schema 名称符合 Avro 命名规范。在数据库变更事件被转换为 Kafka 消息时,需要为每个表和字段创建相应的 Avro Schema,而这些名称必须符合 Avro 的命名规则。

2. 核心功能

  1. 名称校验

    • 检查 Schema 名称是否符合 Avro 命名规范
    • 验证首字符和非首字符的合法性
  2. 名称调整

    • 将不合法字符替换为合法字符(默认使用下划线’_')
    • 保持名称的语义性和可读性
  3. 冲突处理

    • 检测并处理名称冲突
    • 支持自定义冲突处理策略

3. 实现原理

3.1 核心接口

public interface SchemaNameAdjuster {/*** 调整提议的名称使其符合 Avro 命名规范*/String adjust(String proposedName);
}

3.2 名称验证规则

  1. 首字符规则
public static boolean isValidFullnameFirstCharacter(char c) {return (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || c == '_';
}
  1. 非首字符规则
public static boolean isValidFullnameNonFirstCharacter(char c) {return c == '.' || isValidFullnameFirstCharacter(c) || (c >= '0' && c <= '9');
}

3.3 调整策略

  1. 默认策略
SchemaNameAdjuster adjuster = SchemaNameAdjuster.create("_", (original, replacement, conflict) -> {LOGGER.warn("Schema name '{}' is invalid, using '{}' instead", original, replacement);
});
  1. 自定义替换
SchemaNameAdjuster customAdjuster = SchemaNameAdjuster.create(c -> c == '-' ? "_" : String.valueOf(c),(original, replacement, conflict) -> {// 自定义冲突处理逻辑}
);

4. 应用场景

4.1 表 Schema 构建

在构建数据库表的 Schema 时,需要为 Value Schema 和 Key Schema 生成合法的 Avro 名称:

SchemaBuilder valSchemaBuilder = SchemaBuilder.struct().name(schemaNameAdjuster.adjust(schemaNamePrefix + ".Value"));
SchemaBuilder keySchemaBuilder = SchemaBuilder.struct().name(schemaNameAdjuster.adjust(schemaNamePrefix + ".Key"));

4.2 CloudEvents 格式转换

在将 Debezium 事件转换为 CloudEvents 格式时,需要调整 Schema 名称:

CESchemaBuilder ceSchemaBuilder = defineSchema().withName(schemaNameAdjuster.adjust(maker.ceEnvelopeSchemaName()))

4.3 逻辑表路由

在进行逻辑表路由时,需要为新的目标主题生成合法的 Schema 名称:

valueBuilder.name(schemaNameAdjuster.adjust(newTopicName + ".Value"));

4.4 心跳机制

在配置心跳机制时,需要确保心跳消息的 Schema 名称符合规范:

return new HeartbeatImpl(interval,topic,logicalName,schemaNameAdjuster);

4.5 基本用例

  1. 表名转换
String tableName = "my-table";
String adjustedName = adjuster.adjust(tableName);  // 结果: "my_table"
  1. 复杂Schema名称
String complexName = "com.example.my-schema.v2";
String adjusted = adjuster.adjust(complexName);    // 结果: "com.example.my_schema.v2"
  1. 特殊字符处理
String specialChars = "table$name@2.0";
String adjusted = adjuster.adjust(specialChars);   // 结果: "table_name_2.0"

4.6 具体Schema生成示例

让我们以一个具体的表结构为例,展示 SchemaNameAdjuster 如何处理 Schema 名称:

-- 原始表结构
CREATE TABLE inventory.products (id INT PRIMARY KEY,name VARCHAR(255),description TEXT,weight DECIMAL(5,
http://www.lryc.cn/news/504330.html

相关文章:

  • Stable Diffusion绘画 | SDXL模型使用注意事项
  • (五)机器学习 - 数据分布
  • Flink State面试题和参考答案-(上)
  • 利用开源Stable Diffusion模型实现图像压缩比竞争方法用更低的比特率生成更逼真的图像
  • QT信号与槽机制详解
  • openGauss开源数据库实战二十二
  • BurpSuite解决暴力破解时需要验证码问题
  • WPF Combox使用 Text无法选择正确获取CHange后的Text
  • 【速览】设计模式(更新中)
  • 【stable diffusion部署】Stable Diffusion开源本地化的文生图图生图AI
  • 县城楼市踩踏式降价,或现2字头,率先回归月薪一平方的合理价格
  • 计算机组成原理(七):二进制编码
  • 【GitHub分享】you-get项目
  • 论文概览 |《Sustainable Cities and Society》2024.12 Vol.116
  • 解决node.js的req.body为空的问题
  • Mysql学习笔记之安装
  • 将PDF流使用 canvas 绘制然后转为图片展示在页面上(二)
  • 【深度学习】 零基础介绍卷积神经网络(CNN)
  • Coze概述
  • 康佳Android面试题及参考答案(多张原理图)
  • 2022 年 3 月青少年软编等考 C 语言四级真题解析
  • 关于24年408真题的疑问
  • 【容器】k8s学习笔记基础部分(三万字超详细)
  • dayjs(2kb)和momentjs(70kb)关系详述及项目中如何选择讲解
  • 【Python网络爬虫笔记】11- Xpath精准定位元素
  • 6.python列表
  • Android中bindService和startService启动服务有何区别
  • 超牛免费 机械臂模型、工业机器人模型下载网站集合
  • 引领未来的变革:15种前沿RAG技术及其应用探索
  • Scala泛型应用场景