当前位置: 首页 > news >正文

FlinkSql 如何实现数据去重?

摘要

很多时候flink消费上游kafka的数据是有重复的,因此有时候我们想数据在落盘之前进行去重,这在实际开发中具有广泛的应用场景,此处不说详细代码,只粘贴相应的flinksql

代码

--********************************************************************--
-- 创建临时表(只在当前sessoin生效的表称为临时表) DDL
CREATE TEMPORARY TABLE UserAttrSource ( `data` string,`kafkaMetaTimestamp` TIMESTAMP(3) METADATA FROM 'timestamp', -- kafka record携带的源数据时间戳,参考官网kafka connectorproctime as PROCTIME() -- 获取数据处理时间,这是flink内置支持的关键字
) WITH ('connector' = 'kafka','topic' = 'user_attri_ad_dirty_data','properties.bootstrap.servers' = 'kafka地址','scan.startup.mode' = 'timestamp', -- kafka扫描数据模式,参考官网kafka connector'scan.startup.timestamp-millis' ='1687305600000' , -- 2023-06-21 08:00:00'format' = 'raw' -- 意思是将kafka数据格式化为string
);-- 创建SINKCREATE TEMPORARY TABLE ADB (log_date DATE,`errorType` int,appId string,`errorCode` int,`errorReason` string,`deserialization` string,`originalData` string,kafkaMetaTimestamp TIMESTAMP,data_hash string,PRIMARY KEY (`data_hash`) NOT ENFORCED
)
WITH ('connector' = 'adb3.0','url' = 'jdbc:mysql://xxxx:3306/flink_data?rewriteBatchedStatements=true','tableName' = 'usr_attr_dirty', 'userName'='username','password'='password'
);
-- 去重视图, 这是关键(json_value是flink的内置函数,data_hash是数据本身的primary key)
-- 下述语句含义是:根据data_hash字段分组,按照处理时间排序,取出最新的一条数据,其他的重复数据将被抛弃
CREATE TEMPORARY VIEW quchong ASSELECT data,kafkaMetaTimestamp FROM (SELECT *,ROW_NUMBER() OVER (PARTITION BY json_value(data,'$.data_hash') ORDER BY proctime DESC) as row_numFROM UserAttrSource)WHERE row_num = 1;--  插入目标表
insert into ADB
select TO_DATE(DATE_FORMAT(kafkaMetaTimestamp,'yyyy-MM-dd') )AS log_date,json_value(data,'$.errorType' RETURNING INT) errorType,json_value(data,'$.appId' NULL ON EMPTY) appId,json_value(data,'$.errorCode'  RETURNING INT) errorCode,json_value(data,'$.errorReason' NULL ON EMPTY) errorReason,json_value(data,'$.deserialization' NULL ON EMPTY) deserialization,json_value(data,'$.originalData') originalData,kafkaMetaTimestamp,json_value(data,'$.data_hash') data_hash
from quchong;
http://www.lryc.cn/news/145319.html

相关文章:

  • 机器学习概念
  • 【数据结构】排序(插入、选择、交换、归并) -- 详解
  • 游戏中的图片打包流程,免费的png打包plist工具,一款把若干资源图片拼接为一张大图的免费工具
  • Springboot实现ENC加密
  • nginx 托管vue项目配置
  • Vue3中如何进行封装?—组件之间的传值
  • 实训笔记8.25
  • vue自定义监听元素宽高指令
  • 网络爬虫到底是个啥?
  • 前端行级元素和块级元素的基本区别
  • CentOS 7用二进制安装MySQL5.7
  • 华为加速回归Mate 60发布, 7nm全自研工艺芯片
  • Linux系列讲解 —— 【systemd】下载及编译记录
  • u-view 的u-calendar 组件设置默认日期后,多次点击后,就不滚动到默认日期的位置
  • vue naive ui 按钮绑定按键
  • Viobot基本功能使用及介绍
  • 《PMBOK指南》第七版12大原则和8大绩效域
  • docker 启动命令
  • C++ DAY7
  • Vue2 使用插件 Volar 报错:<template v-for> key should be placed on the <template> tag.
  • 启动线程方法 start ()和 run ()有什么区别
  • Java的全排列模板
  • 读书笔记——《万物有灵》
  • 面试现场表现:展示你的编程能力和沟通技巧
  • 34亿的mysql表如何优雅的扩字段长度兵并归档重建
  • C#_进程单例模式.秒懂Mutex
  • AcWing 5050. 排序 (每日一题)
  • 【TypeScript】proxy 和 Reflect
  • STM32f103入门(5)定时器中断
  • Mybatis查询数据