当前位置: 首页 > news >正文

Seatunnel解决ftp读取json文件无法读取数组以及格式化之后的json无法解析的问题

问题原因

在JsonRead这个方法里面 在源码中使用的逻辑是读取一行 然后把这个json进行解析
但是这样存在一个问题 比如如果json的格式是这样的
{
name:“zhangsan”,
age:25
}
如果是这样的话 第一行读到的内容就是 {
显然 一个 { 并不是一个json 这样会导致解析json失败

问题解决的思路

我的方法是将整个文件中的内容全部解析
然后使用Seatunnel中自带的JackJson这个工具类进行解析
然后再获取到单个的Json对象 之后再解析成一个Json的字符串
因为解析过之后的Json字符串肯定不存在换行 所以这种换行的问题算是规避了
但是这样又引发了另一个问题就是 一下子加载全部的文件内容可能会导致内存飙升 而且解析json 构造对象这个过程也是比较耗费资源的
但是我目前没有想出来更好的方法
我目前的业务需求是 这种ftp的文件都是小文件 不存在特别大的json 所以我的这个方法是可以完成现在的需求的

修改代码的内容

要修改的代码的位置是
org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java

    @Overridepublic void readProcess(String path,String tableId,Collector<SeaTunnelRow> output,InputStream inputStream,Map<String, String> partitionsMap,String currentFileName)throws IOException {InputStream actualInputStream;switch (compressFormat) {case LZO:LzopCodec lzo = new LzopCodec();actualInputStream = lzo.createInputStream(inputStream);break;case NONE:actualInputStream = inputStream;break;default:log.warn("Json file does not support this compress type: {}",compressFormat.getCompressCodec());actualInputStream = inputStream;break;}try (BufferedReader reader =new BufferedReader(new InputStreamReader(actualInputStream, encoding))) {//TODO wxt 优先使用之前的方法try{reader.lines().forEach(line -> {try {SeaTunnelRow seaTunnelRow =deserializationSchema.deserialize(line.getBytes(StandardCharsets.UTF_8));if (isMergePartition) {int index = seaTunnelRowType.getTotalFields();for (String value : partitionsMap.values()) {seaTunnelRow.setField(index++, value);}}seaTunnelRow.setTableId(tableId);output.collect(seaTunnelRow);} catch (IOException e) {String errorMsg =String.format("Deserialize this jsonFile data [%s] failed, please check the origin data",line);throw new FileConnectorException(FileConnectorErrorCode.DATA_DESERIALIZE_FAILED,errorMsg,e);}});}catch (Exception e){//region 我修改的内容//首先读取全部的内容// 将 BufferedReader 内容读取到一个 StringStringWriter stringWriter = new StringWriter();String line;while ((line = reader.readLine()) != null) {stringWriter.write(line);}String jsonContent = stringWriter.toString();// 判断 JSON 类型并处理ObjectMapper objectMapper = new ObjectMapper();JsonNode jsonNode = objectMapper.readTree(jsonContent);if (jsonNode.isArray()) {// 遍历数组并转换为单行字符串for (JsonNode node : jsonNode) {String singleLineJson = objectMapper.writeValueAsString(node);// region 这一部分是我直接从上面复制下来的try {SeaTunnelRow seaTunnelRow =deserializationSchema.deserialize(singleLineJson.getBytes(StandardCharsets.UTF_8));if (isMergePartition) {int index = seaTunnelRowType.getTotalFields();for (String value : partitionsMap.values()) {seaTunnelRow.setField(index++, value);}}seaTunnelRow.setTableId(tableId);output.collect(seaTunnelRow);} catch (IOException e1) {String errorMsg =String.format("Deserialize this jsonFile data [%s] failed, please check the origin data",singleLineJson);throw new FileConnectorException(FileConnectorErrorCode.DATA_DESERIALIZE_FAILED,errorMsg,e);}// endregion}} else if (jsonNode.isObject()) {String singleLineJson = objectMapper.writeValueAsString(jsonNode);// region 这一部分是我直接从上面复制下来的try {SeaTunnelRow seaTunnelRow =deserializationSchema.deserialize(singleLineJson.getBytes(StandardCharsets.UTF_8));if (isMergePartition) {int index = seaTunnelRowType.getTotalFields();for (String value : partitionsMap.values()) {seaTunnelRow.setField(index++, value);}}seaTunnelRow.setTableId(tableId);output.collect(seaTunnelRow);} catch (IOException e1) {String errorMsg =String.format("Deserialize this jsonFile data [%s] failed, please check the origin data",singleLineJson);throw new FileConnectorException(FileConnectorErrorCode.DATA_DESERIALIZE_FAILED,errorMsg,e);}// endregion}//endregion}}}
http://www.lryc.cn/news/496746.html

相关文章:

  • Elasticsearch在liunx 中单机部署
  • 深入探索 HarmonyOS 的 Navigation 组件:灵活的页面管理与动态导航
  • 【CUDA】CUDA Hierarchy
  • 28.100ASK_T113-PRO Linux+QT 显示一张照片
  • GitLab使用中遇到的一些问题-记录
  • 【微服务】Docker
  • 【C#】书籍信息的添加、修改、查询、删除
  • Python 入门教程(2)搭建环境 | 2.4、VSCode配置Node.js运行环境
  • Spark常问面试题---项目总结
  • 【AI系统】Auto-Tuning 原理
  • AMEYA360:上海永铭电子全新高压牛角型铝电解电容IDC3系列,助力AI服务器电源高效运转
  • echarts地图立体效果,echarts地图点击事件,echarts地图自定义自定义tooltip
  • 什么是 Socket?
  • 【版本控制】SVN安装到使用一条路讲解
  • KVCKVO
  • PyQt设计界面优化 #qss #ui设计 #QMainWindow
  • Qt Serial Bus 前置介绍篇
  • 12.2深度学习_项目实战
  • LeetCode 64. 最小路径和(HOT100)
  • ESP8266作为TCP客户端或者服务器使用
  • C#结合.NET框架快速构建和部署AI应用
  • 题外话 (火影密令)
  • 蓝桥杯准备训练(lesson1,c++方向)
  • RTDETR融合[ECCV2024]WTConvNeXt中的WTConv模块及相关改进思路
  • AD7606使用方法
  • 嵌入式系统应用-LVGL的应用-平衡球游戏 part1
  • JVM(四) - JVM 内存结构
  • 【AI系统】CANN 算子类型
  • VUE脚手架练习
  • 动态艺术:用Python将文字融入GIF动画