当前位置: 首页 > news >正文

【flink】 flink 读取debezium-json数据获取数据操作类型op/rowkind方法

flink 读取debezium-json数据获取数据操作类型op/rowkind方法。
op类型有c(create),u(update),d(delete)
参考官网案例:此处的"op": "u",就是操作类型。

{"before": {"id": 111,"name": "scooter","description": "Big 2-wheel scooter","weight": 5.18},"after": {"id": 111,"name": "scooter","description": "Big 2-wheel scooter","weight": 5.15},"source": {...},"op": "u","ts_ms": 1589362330904,"transaction": null
}

思路:添加自定义metareader
具体修改方法:去github flink 下载flink源码,选择tags,后按照自己需要的版本下载。
至idea中。修改org.apache.flink.formats.json.debezium.DebeziumJsonDecodingFormat

enum ReadableMetadata枚举类中添加枚举对象:
注意:operation.type可以按需更改,opudf也能按需更改,但不能是op,否则原有的冲突。
添加如下代码:

        OPERATION_TYPE("operation.type",DataTypes.STRING().nullable(),true,DataTypes.FIELD("opudf", DataTypes.STRING()),new MetadataConverter() {private static final long serialVersionUID = 1L;@Overridepublic Object convert(GenericRowData row, int pos) {return row.getString(2);}})

编译项目:

# install
mvn clean package -DskipTests "-Dmaven.test.skip=true" "-Pfast" -pl flink-formats/flink-json
# maven install 到本地
mvn install:install-file "-DgroupId=org.apache.flink" "-DartifactId=flink-json-udf" "-Dversion=1.13.6" -Dpackaging=jar "-Dfile=D:\projects\flink-release-1.13.6\flink-formats\flink-json\target\flink-json-1.13.6.jar"# gradle 不能使用 implement(files("libs/flink-json-1.13.6.jar")) 的方式。需要直接加载到仓库。或者user/.m2目录下的临时仓库。
# 如下路径不能直接复制,可供参考,自己找下。
copy D:\projects\flink-release-1.13.6\flink-formats\flink-json\target\flink-json-1.13.6.jar D:\env\Gradle_Repository\caches\modules-2\files-2.1\org.apache.flink\flink-json\1.13.6\cb4daaf018e2
10faa54e76488cbb11e179502728

使用方法:
添加列定义:
opt_type STRING METADATA FROM 'value.operation.type' VIRTUAL,

create table test.some_debezium
(event_time      TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,origin_table    STRING METADATA FROM 'value.source.table' VIRTUAL,opt_type        STRING METADATA FROM 'value.operation.type' VIRTUAL,id            BIGINT,name          STRING,) WITH('connector'='kafka','topic'='your_topic','properties.bootstrap.servers'='host1:6667,host2:6667,host3:6667','properties.group.id'='your_group_id','properties.security.protocol'='SASL_PLAINTEXT','properties.sasl.kerberos.service.name'='kafka','format'='debezium-json','debezium-json.timestamp-format.standard'='ISO-8601','debezium-json.schema-include'='true','debezium-json.ignore-parse-errors'='true'
);
http://www.lryc.cn/news/572872.html

相关文章:

  • “地标界爱马仕”再拓疆域:世酒中菜联袂赤水金钗石斛定义中国GI
  • GM DC Monitor v2.0 卸载教程
  • 通达信 多空寻龙指标系统:精准捕捉趋势转折与强势启动信号 幅图指标
  • Java八股文——消息队列「场景篇」
  • 思辨场域丨AR技术如何重塑未来学术会议体验?
  • 汽车加气站操作工考试题库含答案【最新】
  • 华为 FreeArc耳机不弹窗?
  • 容器技术人们与DOCKER环境部署
  • OSPF 路由协议基础实验
  • ZeroSearch:阿里开源无需外接搜索引擎的RL框架,显著提升LLM的搜索能力!!
  • AMHS工程项目中-MCS-STKC之间的office 测试场景的介绍
  • 搭建pikachu靶场
  • 【云创智城】YunCharge充电桩系统源码实现云快充协议深度解析与Java技术实践:打造高效充电桩运营系统
  • java面试题03静态修饰类,属性,方法有什么特点?
  • macOS - 根据序列号查看机型、保障信息
  • JavaWeb RESTful 开发规范入门
  • Spring 中的依赖注入(DI)详解
  • 通过Radius认证服务器实现飞塔/华为防火墙二次认证:原理、实践与安全价值解析
  • 20250620在Ubuntu20.04.6下编译KickPi的K7的Android14系统解决缺少libril.so.toc的问题
  • 【网络安全】ios逆向一般整理
  • 求助帖:学Java开发方向还是网络安全方向前景好
  • GitHub Copilot 配置快捷键
  • WebServer实现:muduo库的主丛Reactor架构
  • 无人机低空经济十大前沿创新应用探索-具体做无人机什么呢?优雅草卓伊凡
  • 日常运维问题汇总-25
  • 倒计时 效果
  • 云祺容灾备份系统公有云备份与恢复实操-AWS
  • 【大数据高并发核心场景实战】 - 数据持久化之冷热分离
  • Android Kotlin 用法对比Java使用小结
  • 云计算与5G:如何利用5G网络优化云平台的性能