当前位置: 首页 > news >正文

二百七十一、Kettle——ClickHouse增量导入数据清洗记录表

一、目的

在完成错误数据表任务后,需要对每条错误数据的错误字段及其字段值进行分析

Hive中原有SQL语句和ClickHouse现有SQL语句很大不同

二、Hive中原有代码

2.1 表结构

--31、静态排队数据清洗记录表
create  table  if not exists  hurys_db.dwd_data_clean_record_queue(id             string     comment '唯一ID',data_type      int        comment '1:转向比,2:统计,3:评价,4:区域,5:过车,6:静态排队,7:动态排队,8:轨迹,9:事件数据,10:事件资源',device_no      string     comment '设备编号',create_time    string  comment '创建时间',field_name     string     comment '字段名',field_value    string     comment '字段值'
)
comment '静态排队数据清洗记录表'
partitioned by (day string)
stored as orc
;

2.2 SQL代码

with t3 as(
selectid,device_no,case when device_no is null then CONCAT('device_no:','null')  END AS device_no_value,create_time,case when lane_no < 0 or lane_no >255 then CONCAT('lane_no:', CAST(lane_no AS STRING)) END AS lane_no_value,case when queue_len < 0 or queue_len > 500 then CONCAT('queue_len:', CAST(queue_len AS STRING))  END AS queue_len_value,case when queue_head < 0 or queue_head > 500 then  CONCAT('queue_head:', CAST(queue_head AS STRING))  END AS queue_head_value,case when queue_tail < 0 or queue_tail > 500 then  CONCAT('queue_tail:', CAST(queue_tail AS STRING))  END AS queue_tail_value,case when queue_count < 0 or queue_count > 100  then  CONCAT('queue_count:', CAST(queue_count AS STRING))  END AS queue_count_value,concat_ws(',',case when device_no is null then CONCAT('device_no:','null') end ,case when lane_no < 0 or lane_no >255 then CONCAT('lane_no:', CAST(lane_no AS STRING)) END ,case when queue_len < 0 or queue_len > 500 then CONCAT('queue_len:', CAST(queue_len AS STRING))  END,case when queue_head < 0 or queue_head > 500 then  CONCAT('queue_head:', CAST(queue_head AS STRING))  END,case when queue_tail < 0 or queue_tail > 500 then  CONCAT('queue_tail:', CAST(queue_tail AS STRING))  END,case when queue_count < 0 or queue_count > 100  then  CONCAT('queue_count:', CAST(queue_count AS STRING))  END) AS kv_pairs  ,day
from hurys_db.dwd_queue_errorwhere day='2024-09-10'
)
insert  overwrite  table  hurys_db.dwd_data_clean_record_queue partition(day)
selectid,'6' data_type,t3.device_no,create_time,split(pair, ':')[0] AS field_name,split(pair, ':')[1] AS field_value,day
from t3
lateral view explode(split(t3.kv_pairs , ',')) exploded_table AS pair
where device_no_value is not null or queue_len_value is not null or lane_no_value is not null
or queue_head_value is not null or queue_tail_value is not null or queue_count_value is not null
;

三、ClickHouse中现有代码

3.1 表结构

--31、静态排队数据清洗记录表(长期存储)
create  table  if not exists  hurys_jw.dwd_data_clean_record_queue(id             String            comment '唯一ID',data_type      Nullable(Int32)      comment '1:转向比,2:统计,3:评价,4:区域,5:过车,6:静态排队,7:动态排队,8:轨迹,9:事件数据,10:事件资源',device_no      Nullable(String)     comment '设备编号',create_time    DateTime          comment '创建时间',field_name     Nullable(String)     comment '字段名',field_value    Nullable(String)     comment '字段值',day            Date                 comment '日期'
)
ENGINE = MergeTree
PARTITION BY day
PRIMARY KEY (day,id)
ORDER BY (day,id)
SETTINGS index_granularity = 8192;

3.2 SQL代码

SELECTid,'6' AS data_type,device_no,create_time,splitByString(':', pair)[1] AS field_name,splitByString(':', pair)[2] AS field_value,day
FROM (SELECTid,device_no,create_time,day,arrayConcat(if(device_no IS NULL, ['device_no:null'], []),if(lane_no < 0 OR lane_no > 255, [concat('lane_no:', toString(lane_no))], []),if(queue_len < 0 OR queue_len > 500, [concat('queue_len:', toString(queue_len))], []),if(queue_head < 0 OR queue_head > 500, [concat('queue_head:', toString(queue_head))], []),if(queue_tail < 0 OR queue_tail > 500, [concat('queue_tail:', toString(queue_tail))], []),if(queue_count < 0 OR queue_count > 100, [concat('queue_count:', toString(queue_count))], [])) AS pairsFROM hurys_jw.dwd_queue_errorWHERE device_no IS NULL ORlane_no < 0 OR lane_no > 255 OR   queue_len < 0 OR queue_len > 500 ORqueue_head < 0 OR queue_head > 500 OR  queue_tail < 0 OR queue_tail > 500 ORqueue_count < 0 OR queue_count > 100
) AS subquery
array join pairs AS pair
;

注意:1、错误数据表dwd_queue_error的清洗字段不能设置nullable,这是一大坑

           2、如果错误数据表中的清洗字段是Decimal(10,1),那么相关字段就要调整

arrayConcat(if(device_no IS NULL, ['device_no:null'], []),if(lane_no < 0 OR lane_no > 255, [concat('lane_no:', toString(lane_no))], []),if(azimuth < 0 OR azimuth > toDecimal32(359.9,1), [concat('azimuth:', toString(azimuth))], []),if(rcs < -64 OR rcs > toDecimal32(63.5,1), [concat('rcs:', toString(rcs))], []),if(prob < 0 OR prob > 100, [concat('prob:', toString(prob))], [])
) AS pairs

3.3 Kettle任务

3.3.1 newtime

3.3.2 替换NULL值

3.3.3 clickhouse输入

3.3.4 字段选择

3.3.5 clickhouse输出

3.3.6 执行任务

3.3.7 海豚调度

由于不需要实时记录,因为把所有数据的清洗记录任务放在一个海豚工作流里面,T+1执行即可!

http://www.lryc.cn/news/468689.html

相关文章:

  • 为什么说Tcp是面向字节流的以及(Tcp粘包问题、TCP/UDP对比、listen函数的backlog参数的意义)
  • Flink PostgreSQL CDC源码解读:深入理解数据流同步
  • 系统架构设计师 软件架构的定义与生命周期
  • 从零开始使用Surya-OCR最新版本0.6.1——最强文本检测模型:新添表单表格检测识别
  • linux中级wed服务器(https搭建加密服务器)
  • 聊一聊为什么企业数字化转型总是三天热度
  • 2025年NPDP产品经理认证考试时间和报考条件
  • 微信小程序文字转语音播报案例
  • QT SSDP 局域网检测支持扫描通信
  • python_学习2(仅为本人学习记录)
  • 手动将python的flask程序打包成exe在windows上执行
  • 老生常谈,MySQL事务隔离级别
  • 百度翻译以及另外三款翻译工具推荐!!!
  • Redis JSON介绍和命令大全
  • yolo自动化项目实例解析(八)自建UI-键鼠录制回放
  • C++ 面向对象知识汇总(超详细)
  • stm32使用SIM900A模块实现MQTT对接远程服务器
  • MATLAB Simulink (一)直接序列扩频通信系统
  • 标准数字隔离器主要特性和应用---腾恩科技
  • Spring事务的七种传播行为
  • win10怎么卸载软件干净?电脑彻底删除软件的方法介绍,一键清理卸载残留!
  • excel中,将时间戳(ms或s)转换成yyyy-MM-dd hh:mm.ss或毫秒格式
  • 机房巡检机器人有哪些功能和作用
  • Redis Search系列 - 第一讲 创建索引
  • bat 重置 Navicat 试用
  • 【真题笔记】09-12年系统架构设计师要点总结
  • Node + HTML搭建自己的ChatGPT [基础版]
  • 关于小程序审核需要提交订单列表页面path的修改办法
  • 使用 Nginx 在同一端口部署两个前端项目并配置子路径
  • 怎么选择独立站SEO效果好的wordpress模板