当前位置: 首页 > news >正文

hive加载csv中字段含有换行符的处理方法

我们从数据库卸数据到csv文件中,再加载到hdfs hive里,然后从csv临时表转换到parquet正式表里。
但如果csv某个字段含有换行符,尽管这个csv字段有双引号括起来了,但Hive还是处理成两行了。查阅了各种文档,都无法解决,于是断定hive是不支持字段里有换行符的csv了。
然后我就想了个办法,先写个脚本把csv中的换行符替换成"\n",
如下:

cat /data/dolphinscheduler/loadOraTable.sh
#!/bin/bash
if [[ $# < 5 ]]
thenecho "usage: $0 oraConnect sql hdfsRoot srcSystem bizDate"echo "e.g.: $0  \"yonbip/yonbip@10.16.10.69:1521/orcl\" \"select * from bond_contract\" \"/dmp/biz\" \"bip\" \"2025-07-27\""exit 1
fi
checkRst(){if [[ $? != 0 ]]thenecho "--- check failed"exit 1elseecho "--- check success"fi
}
#解析参数
pgConnect=$1
sql=$2
dmpRoot=$3
srcSystem=$4
bizDate=$5
echo "===== got input params:"
echo "pgConnect: $pgConnect"
echo "sql: $sql"
echo "dmpRoot: $dmpRoot"
echo "srcSystem: $srcSystem"
echo "bizDate: $bizDate"
echo "===== parsed params:"
tableName=$(echo $sql | awk -F ' from ' '{print $2}' |awk -F ' ' '{print $1}')
if [ -z $tableName ]; thentableName=$(echo $sql | awk -F ' FROM ' '{print $2}' |awk -F ' ' '{print $1}')
fi
if [[ $tableName == *.* ]]
thentableName=$(echo $tableName | awk -F '.' '{print $2}')
fi
echo "tableName: $tableName"
echo "===== end of params"echo "1.尝试删除残留文件"
rm -f ~/${tableName}.csvecho "2.1 导出数据到csv文件"
pgCmd="python3 /data/dolphinscheduler/oraQueryToCsv.py \"${pgConnect}\" ${tableName} \"$sql\""
echo "command: $pgCmd"
python3 /data/dolphinscheduler/oraQueryToCsv.py "${pgConnect}" ${tableName} "$sql"
checkRstexport dCount=$(awk 'END {print NR}' ~/${tableName}.csv)
echo "Lines in file ~/${tableName}.csv: $dCount"
if (( $dCount < 1 )); thenrm -f ~/${tableName}.csvecho "No data got from database, no need to go ahead."exit 61
fiecho "2.2 9999-12-31去掉时间"
sed -i 's/9999-12-31 23:59:59/9999-12-31/g' ~/${tableName}.csvecho "2.3 字段中的换行符替换成\n"
python3 /data/dolphinscheduler/removeInColNewLine.py ~/${tableName}.csvecho "3.尝试清除hdfs旧文件"
hdfs dfs -rm -r ${dmpRoot}/tmp/${srcSystem}/${tableName}/${bizDate}echo "4.尝试创建hdfs文件目录"
hdfs dfs -mkdir -p ${dmpRoot}/tmp/${srcSystem}/${tableName}/${bizDate}echo "5.上传本地文件到hdfs"
hdfs dfs -put ~/${tableName}.csv ${dmpRoot}/tmp/${srcSystem}/${tableName}/${bizDate}/
checkRstecho "6.清除本地临时文件"
rm -f ~/${tableName}.csv
cat /data/dolphinscheduler/removeInColNewLine.py
#!/usr/bin/python
# -*- coding:utf-8 -*-#处理csv文件中换行符等特殊字符(\r\n,\n,\r,\\)
#python csv_handler.py filepathimport os
import sys
import csv
import codecs
import timefilename = sys.argv[1]print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),'[', filename.encode('unicode_escape').decode(), ']开始处理')with open(filename, 'r') as srcFile, open(filename + '.tmp', 'w') as dstFile:#读取csv文件的每一行fileReader = csv.reader(srcFile, dialect='excel')fileWriter = csv.writer(dstFile, dialect='excel')colSet = set()for d in list(fileReader):for ii,dd in enumerate(d):if dd.find('\r\n') != -1:dd = dd.replace('\r\n', '\\\\n')colSet.add(ii)if dd.find('\n') != -1:colSet.add(ii)dd = dd.replace('\n', '\\\\n')if dd.find('\r') != -1:colSet.add(ii)dd = dd.replace('\r', '\\\\n')
#            if dd.find('\\') != -1:
#                dd = dd.replace('\\', '')d[ii] = ddfileWriter.writerow(d)if len(colSet) > 0:print('new line cols:' + str(colSet))dstFile.close()srcFile.close()#删除原文件,.tmp文件重命名为原文件
os.remove(filename)
os.rename(filename + '.tmp', filename)print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),'[', filename.encode('unicode_escape').decode(), ']处理完成')

上面这个脚本在替换列中含有的换行符时,也记录下列号了,最后如果有列号,出打印出一行:

new line cols:{x,y}

如:

Lines in file ~/org_corp.csv: 534
2.2 9999-12-31去掉时间
2.3 字段中的换行符替换成\n
2025-08-14 11:40:23 [ /home/hive/org_corp.csv ]开始处理
new line cols:{2,7}
2025-08-14 11:40:23 [ /home/hive/org_corp.csv ]处理完成
3.尝试清除hdfs旧文件
Deleted /dmp/other/tmp/bip/org_corp/2025-08-13
4.尝试创建hdfs文件目录
5.上传本地文件到hdfs
--- check success
6.清除本地临时文件

然后在任务最后,从输出信息里滤出“new line cols:{2,7}”这一行,解析出列号list,输出到参数newLineColNums里面。

ret=`/data/dolphinscheduler/loadOraTable.sh "${dbConnect}" "select ${slctColums} from ${SRC_DB}.${tableName} t " "/dmp/${DMP_DB}" "${srcSystem}" "${bizDate}"`
echo $ret
colsNum=$(echo $ret |grep "new line cols:" | awk -F 'new line cols:{' '{print $2}' | awk -F '}' '{print $1}')
if (( -n $colsNum ))
thenecho "#{setValue(newLineColNums=${colsNum})}"
fi
echo "newLineColNums: $colsNum"

加载到临时表中,从临时表抽取数据时,如果newLineColNums参数不空,则把对应的列加上replace函数,把\n替换回换行符。
任务脚本:

if [ -n "${newLineColNums}" ]
thennlRet=`sh /data/dolphinscheduler/restoreNLFunc.sh "${newLineColNums}" "${slctColums}"`newLineCols=`echo $nlRet | awk -F 'SQL:' '{print $2}'`echo "parsed newLineCols: $newLineCols"
elsenewLineCols="${slctColums}"
fi
hive -e "insert overwrite table ${DMP_DB}.ods_${srcSystem}_${tableName} \
SELECT ${newLineCols} \
from ${DMP_DB}.tmp_${srcSystem}_${tableName} t \
where ${tableId} != '' and ${tableId} is not null;"

基中,slctColums是列名,如"t.id, t.code,t.name, t.memo"
newLineColNums是前面任务输出的参数,如"2,3"

cat /data/dolphinscheduler/restoreNLFunc.sh
if [[ $# < 2 ]]
thenecho "usage: $0 colNums colsName"echo "e.g.: $0 \"new line cols:{1,2}\" \"id,is_set,zl_office_id\""exit 1
fi#colsNum=`echo $1 | awk -F 'new line cols:{' '{print $2}' | awk -F '}' '{print $1}'`
colsNum=$1
colNames=$2
colNmArr=(${colNames//,/ })
echo "${colNmArr[@]}"if [ -n "$colsNum" ]
thenecho "有列含有换行符:$colsNum"colNoArr=(${colsNum//,/ })for coln in ${colNoArr[@]}doecho "col: ${colNmArr[coln]}"colNmArr[coln]="replace(${colNmArr[coln]},'\\\\n', '\\n')"echo "col: ${colNmArr[coln]}"done
fi
colNames=$(IFS=,; echo "${colNmArr[*]}")
echo "SQL:$colNames"

如:

sh restoreNLFunc.sh "1,2" "col1,col2,col3"
col1 col2 col3
有列含有换行符:1,2
col: col2
col: replace(col2,'\\\\n','\\n')
col: col3
col: replace(col3,'\\\\n','\\n')
SQL:col1,replace(col2,'\\\\n','\\n'),replace(col3,'\\\\n','\\n')

最后再用如下语句转到Parquet格式的正式表中,就能看到正常的换行了。

insert into table1 select col1,replace(col2,'\\\\n','\\n'),replace(col3,'\\\\n','\\n') from tmp_table1
http://www.lryc.cn/news/621239.html

相关文章:

  • Spring-cloud-openfeign-设置超时时间
  • 数据结构:用两个栈模拟队列(Queue Using 2 Stacks)
  • 8.14网络编程——TCP通信基础
  • 【22-决策树】
  • 零基础-动手学深度学习-10.3. 注意力评分函数
  • 20道CSS相关前端面试题及答案
  • torch.nn中Sequential的使用
  • 【代码随想录day 20】 力扣 538.把二叉搜索树转换为累加树
  • CMake语法与Bash语法的区别
  • 扩展用例-失败的嵌套
  • 流式数据服务端怎么传给前端,前端怎么接收?
  • jenkins在windows配置sshpass
  • 设计模式笔记_行为型_状态模式
  • 【JavaEE】多线程 -- 线程状态
  • 纸箱拆垛:物流自动化中的“开箱密码”与3D视觉的智能革命
  • 面试题之项目中灰度发布是怎么做的
  • Flink on YARN启动全流程深度解析
  • 会议通信系统核心流程详解(底稿1)
  • Linux软件编程:进程和线程
  • C#面试题及详细答案120道(01-10)-- 基础语法与数据类型
  • Flink Stream API 源码走读 - socketTextStream
  • 2025H1手游市场:SLG领涨、休闲爆发,何为出海新航道?
  • 广告灯的左移右移
  • Day43 复习日
  • FPGA+护理:跨学科发展的探索(五)
  • Kotlin Data Classes 快速上手
  • 【深度学习】深度学习基础概念与初识PyTorch
  • 报数游戏(我将每文更新tips)
  • IPTV系统:开启视听与管理的全新篇章
  • 14 ABP Framework 文档管理