当前位置: 首页 > news >正文

二百七十、Kettle——ClickHouse中增量导入清洗数据错误表

一、目的

比如原始数据100条,清洗后,90条正确数据在DWD层清洗表,10条错误数据在DWD层清洗数据错误表,所以清洗数据错误表任务一定要放在清洗表任务之后。

更关键的是,Hive中原本的SQL语句,放在ClickHouse需要大改,头大!而且Kettle任务要想定时增量导入,既与清洗数据错误表最新时间相关,又与DWD层清洗表最新时间相关,搞了大半天才搞定!

二、Hive中原有代码

2.1 表结构

--21、静态排队错误数据表——动态分区  dwd_queue_error
create  table  if not exists  hurys_db.dwd_queue_error(id                  string          comment '唯一ID',device_no           string          comment '设备编号',source_device_type  string          comment '设备类型',sn                  string          comment '设备序列号 ',model               string          comment '设备型号',create_time         string       comment '创建时间',lane_no             int             comment '车道编号',lane_type           int             comment '车道类型 0:渠化1:来向2:出口3:去向4:左弯待转区5:直行待行区6:右转专用道99:未定义车道',queue_count         int             comment '排队车辆数',queue_len           float           comment '排队长度(m)',queue_head          float           comment '排队头车距停止线距离(m)',queue_tail          float           comment '排队尾车距停止线距离(m)'
)
comment '静态排队错误数据表——动态分区'
partitioned by (day string)
stored as orc
;

2.2 SQL代码

--动态插入数据
insert  overwrite  table  hurys_db.dwd_queue_error  partition(day)
select
UUID()  as  id,
t2.device_no, t2.source_device_type, t2.sn, t2.model, t2.create_time,t2.lane_no, t2.lane_type,
t2.queue_count, t2.queue_len, t2.queue_head, t2.queue_tail, t2.day
from hurys_db.ods_queue as t2
left join hurys_db.dwd_queue as t3
on t3.device_no=t2.device_no and t3.create_time=t2.create_time and t3.lane_no=t2.lane_no
where t3.device_no is null and t3.create_time is null and t3.lane_no is null and t2.day='2024-09-10'
;

原有Hive代码很简单,然后把代码变成脚本,放在海豚定时调度即可,都很简单!

三、ClickHouse中现有代码

3.1 表结构

--21 静态排队数据错误表(长期存储)
create  table  if not exists  hurys_jw.dwd_queue_error(id                  String                       comment '唯一ID',device_no           String             comment '设备编号',source_device_type  Nullable(String)             comment '设备类型',sn                  Nullable(String)             comment '设备序列号 ',model               Nullable(String)             comment '设备型号',create_time         DateTime                     comment '创建时间',lane_no             Int32              comment '车道编号',lane_type           Nullable(Int32)              comment '车道类型 0:渠化1:来向2:出口3:去向4:左弯待转区5:直行待行区6:右转专用道99:未定义车道',queue_count         Int32              comment '排队车辆数',queue_len           Decimal(10, 2)     comment '排队长度(m)',queue_head          Decimal(10, 2)     comment '排队头车距停止线距离(m)',queue_tail          Decimal(10, 2)     comment '排队尾车距停止线距离(m)',day                 Date                         comment '日期'
)
ENGINE = MergeTree
PARTITION BY day
PRIMARY KEY (day,id)
ORDER BY (day,id)
SETTINGS index_granularity = 8192;

注意:由于后面数据清洗记录表需要,因此部分清洗规则的字段不能用Nullable,这也是后面的一大坑!

3.2 SQL代码

select
generateUUIDv4()  as  id,
device_no, source_device_type, sn, model, create_time,
lane_no, lane_type, queue_count, queue_len, queue_head, queue_tail,
cast(day as String) day
from (selectt2.device_no, t2.source_device_type, t2.sn, t2.model,t2.create_time,t2.lane_no, t2.lane_type,t2.queue_count, t2.queue_len, t2.queue_head, t2.queue_tail, toDate(t2.create_time) dayfrom hurys_jw.ods_queue as t2ANTI join hurys_jw.dwd_queue as t3on t3.device_no=t2.device_no and t3.create_time=t2.create_time and t3.lane_no=t2.lane_no)
--where  create_time > ?
;

注意:1 生成uuid字段,Hive中是UUID() as id,而ClickHouse中是generateUUIDv4() as id

           2 ClickHouse中with语句好像不是支持,不知道是不是版本问题

           3 ClickHouse中有ANTI join函数

           4 Kettle里需要把Date字段的day变成cast(day as String) day

3.3 Kettle任务

3.3.1 newtime

获取目标表dwd_queue_error的最新时间create_time

3.3.2 替换NULL值

3.3.3 clickhouse输入

select 
generateUUIDv4()  as  id,
device_no, source_device_type, sn, model, create_time,
lane_no, 
lane_type, queue_count, queue_len, queue_head, queue_tail,
cast(day as String) day
from (
select t2.device_no, t2.source_device_type, t2.sn, t2.model,t2.create_time,t2.lane_no, t2.lane_type,
t2.queue_count, t2.queue_len, t2.queue_head, t2.queue_tail, toDate(t2.create_time) day
from hurys_jw.ods_queue as t2
ANTI join hurys_jw.dwd_queue as t3
on t3.device_no=t2.device_no and t3.create_time=t2.create_time and t3.lane_no=t2.lane_no
)
where  create_time > ?
;

3.3.4 字段选择

3.3.5 newtime3

获取清洗表dwd_queue的最新时间create_time3

3.3.6 替换NULL值3

3.3.7 记录关联 (笛卡尔输出)

注意:清洗表dwd_queue的最新时间create_time3要大于等于目标表dwd_queue_error的最新时间create_time

3.3.8 clickhouse输出

3.3.9 保存后先执行清洗表dwd_queue任务,再执行dwd_queue_error任务

3.3.10 配置海豚调度任务

搞定!!!

http://www.lryc.cn/news/468918.html

相关文章:

  • CentOS6升级OpenSSH9.2和OpenSSL3
  • 2024 年 MathorCup 数学应用挑战赛——大数据竞赛-赛道 A:台风的分类与预测
  • kotlin实现viewpager
  • RabbitMQ最新版本4.0.2在Windows下的安装及使用
  • 东方博宜1180 - 数字出现次数
  • LeetCode: 3274. 检查棋盘方格颜色是否相同
  • datax编译并测试
  • 2-133 基于matlab的粒子群算法PSO优化BP神经网络
  • 复盘秋招22场面试(四)形势重新评估与后续措施
  • 揭开C++ STL的神秘面纱之string:提升编程效率的秘密武器
  • 用人工智能,应该怎么掏钱?
  • 【Axure高保真原型】移动案例
  • Bytebase 3.0.0 - AI 助手全面升级
  • php基础:数据类型、常量、字符串
  • Discuz发布原创AI帖子内容生成:起尔 | AI原创帖子内容生成插件开发定制
  • el-table在某些条件下禁止选中
  • 深入探讨 HTTP 请求方法:GET、POST、PUT、DELETE 的实用指南
  • 深度学习:元学习(Meta-Learning)详解
  • uniapp展示本地pdf + 自定义标题
  • 国标GB28181设备管理软件EasyGBS国标GB28181-2016平台更换SQLite数据库的步骤
  • C++基础与实用技巧第三节:内存管理与性能优化
  • 【移动应用开发】界面设计(二)实现水果列表页面
  • 基于Multisim的四人智力竞赛抢答器设计与仿真
  • 前端学习---(4)js基础-2
  • 重生之“我打数据结构,真的假的?”--3.栈和队列(无习题)
  • 《Python游戏编程入门》注-第2章2
  • PoissonRecon学习笔记
  • 腾讯云DBA面试(一面)
  • Python:背景知识及环境安装
  • 力扣第420周赛 中等 3324. 出现在屏幕上的字符串序列