当前位置: 首页 > news >正文

Flink CDC 自定义函数处理 SQLServer XML类型数据 映射 doris json字段方案

Flink CDC 自定义函数处理 SQLServer XML类型数据方案

1. 背景

因业务使用SQLServer数据库,CDC同步到doris 数仓。对于SQLServer xml类型,doris没有相应的字段对应,

可以使用json来存储xml数据。需要进行一步转换。从 flink 自定义函数入手。

2. 解决方案

  • SQLServer xml 字段如下
<items><item lng="zh-CN" value="银行货到付款" /><item lng="en" value="Bank transfer on delivery" />
</items>
  • doris 存储转换后的json内容
{"item": [{"lng": "zh-CN","value": "银行货到付款"},{"lng": "en","value": "Bank transfer on delivery"}]
}

在这里插入图片描述

  • flink 自定义函数代码

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.dataformat.xml.XmlMapper;
import org.apache.flink.table.functions.ScalarFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** 将XML转换为JSON*/
public class XmlToJson extends ScalarFunction {private Logger log = LoggerFactory.getLogger(XmlToJson.class);/*** 创建XmlMapper对象用于解析XML*/private final XmlMapper xmlMapper = new XmlMapper();public String eval(String xml) {// 将XML字符串解析为JsonNode对象JsonNode jsonNode = null;try {jsonNode = xmlMapper.readTree(xml);} catch (JsonProcessingException e) {log.error("XML解析失败", e);}// 将JsonNode对象转换为JSON字符串return jsonNode.toString();}
}
  • doris 表
-- GName 为json格式
CREATE TABLE `table1` (`ID` int(11) NOT NULL COMMENT '字典表统一ID',`Name` varchar(600) NULL COMMENT '统一进行字典命名',`GName` json NULL COMMENT '采用xml存储多语言',
) ENGINE=OLAP
UNIQUE KEY(`ID`)
COMMENT '测试表'
DISTRIBUTED BY HASH(`ID`) BUCKETS AUTO
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"is_being_synced" = "false",
"storage_format" = "V2",
"enable_unique_key_merge_on_write" = "true",
"light_schema_change" = "true",
"disable_auto_compaction" = "false",
"enable_single_replica_compaction" = "false"
);
  • 注册自定义函数 sql调用转换
create temporary function xml_to_json as 'com.zfb.flink.udf.XmlToJson';INSERT INTO flink_doris (`ID`,`Name`, `GName`)
SELECT 
`ID`,`Name`, xml_to_json(`GName`), `TypeID`
FROM table1;
  • doris json使用
selectjson_extract_string(GName, '$.item[0].value') as cn_name,*
fromtable1;  
http://www.lryc.cn/news/516187.html

相关文章:

  • F.interpolate函数
  • 华为交换机---自动备份配置到指定ftp/sftp服务器
  • nginx学习之路-nginx配置https服务器
  • UCAS 24秋网络认证技术 CH10 SSL 复习
  • 【linux内核分析-存储】EXT4源码分析之“文件删除”原理【七万字超长合并版】(源码+关键细节分析)
  • 代码随想录 day62 第十一章 图论part11
  • springboot571基于协同过滤算法的私人诊所管理系统(论文+源码)_kaic
  • Uniapp Android 本地离线打包(详细流程)
  • vite+vue3动态引入资源文件(问题已解决但离了个大谱)
  • 通过 4 种方式快速将音乐从 iPod 传输到 Android
  • ArcGIS中怎么把数据提取到指定范围(裁剪、掩膜提取)
  • 【Vaadin flow 实战】第3讲-快速上手构建VaadinFlow+Springboot的全栈web项目
  • HBase Cassandra的部署和操作
  • 用户界面软件01
  • 【云原生】Docker Compose 从入门到实战使用详解
  • 【ShuQiHere】使用 SCP 进行安全文件传输
  • 海康威视H5player问题汇总大全
  • 力扣23.合并K个升序链表
  • 【C 语言指针篇】指针的灵动舞步与内存的神秘疆域:于 C 编程世界中领略指针艺术的奇幻华章
  • 游戏关卡设计的常用模式
  • 在一台服务器上使用docker运行kafka集群
  • Apache Celeborn 在B站的生产实践
  • JOIN 和 OUTER JOIN,SQL中常见的连接方式
  • Vue2: table加载树形数据的踩坑记录
  • 电子信息硕士面试经验
  • dns网址和ip是一一对应的吗?
  • springboot3 redis 常用操作工具类
  • Java工程师实现视频文件上传minio文件系统存储及网页实现分批加载视频播放
  • Redis(二)value 的五种常见数据类型简述
  • Docker 环境中搭建 Redis 哨兵模式集群的步骤与问题解决