当前位置: 首页 > news >正文

数据采集(全量采集和增量采集)

全量采集:采集全部数据

3、全量采集

vim students_all.json
{"job": {"setting": {"speed": {"channel": 1},"errorLimit": {"record": 0,"percentage": 0.02}},"content": [{"reader": {"name": "mysqlreader","parameter": {"username": "root","password": "123456","splitPk": "id","column": ["id","name","age","gender","clazz","update_time"],"connection": [{"table": ["students"],"jdbcUrl": ["jdbc:mysql://master:3306/bigdata31"]}]}},"writer": {"name": "hdfswriter","parameter": {"defaultFS": "hdfs://master:9000","fileType": "text","path": "/data/students_all/dt=${dt}","fileName": "students","column": [{"name": "id","type": "STRING"},{"name": "name","type": "STRING"},{"name": "age","type": "INT"},{"name": "gender","type": "STRING"},{"name": "clazz","type": "STRING"},{"name": "update_time","type": "STRING"}],"writeMode": "truncate","fieldDelimiter": ","}}}]}
}

# 创建分区目录
hdfs dfs -mkdir -p  /data/students_all/dt=2024-10-21
# 执行datax脚本
datax.py -p"-Ddt=2024-10-21" students_all.json
# 增加分区
hive -e "alter table students_all add if not exists partition(dt='2024-10-21');"

增量采集:就只采集新插入或修改的数据

1、原表需要有一个更新时间字段

CREATE TABLE `students`  (`id` bigint(20) ,`name` varchar(255) ,`age` bigint(20),`gender` varchar(255) ,`clazz` varchar(255),`update_time` datetime NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
) ;

2、在hive中创建分区表

create external table if not exists students_all(id bigint comment '学生id',name string comment '学生姓名',age bigint comment '学生年龄',sex string comment '学生性别',clazz string comment '学生班级',update_time string comment '更新时间'
) comment '学生信息表'
partitioned by (dt string)
row format delimited fields terminated by ','
stored as textfile 
location 'hdfs://master:9000/data/students_all';

4、创建增量表

create external table if not exists students_acc(id bigint comment '学生id',name string comment '学生姓名',age bigint comment '学生年龄',sex string comment '学生性别',clazz string comment '学生班级',update_time string comment '更新时间'
) comment '学生信息表'
partitioned by (dt string)
row format delimited fields terminated by ','
stored as textfile 
location 'hdfs://master:9000/data/students_acc';

5、增量采集更新的数据

vim students_acc.json
{"job": {"setting": {"speed": {"channel": 1},"errorLimit": {"record": 0,"percentage": 0.02}},"content": [{"reader": {"name": "mysqlreader","parameter": {"username": "root","password": "123456","splitPk": "id","where": "substr(update_time,1,10)='${dt}'","column": ["id","name","age","gender","clazz","update_time"],"connection": [{"table": ["students"],"jdbcUrl": ["jdbc:mysql://master:3306/bigdata31"]}]}},"writer": {"name": "hdfswriter","parameter": {"defaultFS": "hdfs://master:9000","fileType": "text","path": "/data/students_acc/dt=${dt}","fileName": "students","column": [{"name": "id","type": "STRING"},{"name": "name","type": "STRING"},{"name": "age","type": "INT"},{"name": "gender","type": "STRING"},{"name": "clazz","type": "STRING"},{"name": "update_time","type": "STRING"}],"writeMode": "truncate","fieldDelimiter": ","}}}]}
}
# 创建分区目录
hdfs dfs -mkdir -p  /data/students_acc/dt=2024-10-22
# 执行datax脚本
datax.py -p"-Ddt=2024-10-22" students_acc.json
# 增加分区
hive -e "alter table students_acc add if not exists partition(dt='2024-10-22');"

6、合并数据

vim student_merge.sql
insert overwrite table students_all partition(dt='${dt}')
selectid,name,age,sex,clazz,update_time
from(selectid,name,age,sex,clazz,update_time,row_number() over (partition byidorder byupdate_time desc) as rfrom(select*fromstudents_allwheredt = '${diff_dt}'union allselect*fromstudents_accwheredt = '${dt}') as a) as b
wherer = 1;
hive -f student_merge.sql -d dt=2024-10-22 -d diff_dt=2024-10-21spark-sql \
--master yarn \
--deploy-mode client \
--num-executors 2 \
--executor-cores 1 \
--executor-memory 2G \
--conf spark.sql.shuffle.partitions=1 \
-f student_merge.sql -d dt=2024-10-22 -d diff_dt=2024-10-21

http://www.lryc.cn/news/474854.html

相关文章:

  • GPT-Sovits-1-数据处理
  • web前端多媒体标签设置(图片,视频,音频)以及图片热区(usemap)的设置
  • 尚硅谷react教程_扩展_stateHook
  • 专线物流公共服务平台:数据驱动,标准引领,共创金融双赢新时代
  • 界面控件DevExpress JS ASP.NET Core v24.1亮点 - 支持Angular 18
  • Spring之依赖注入(DI)和控制反转(IoC)——配置文件、纯注解
  • 基于SpringBoot的宠物健康咨询系统的设计与实现
  • Lucene的使用方法与Luke工具(2)
  • 【客户端开发】electron 中无法使用 js-cookie 的问题
  • kafka客户端消费者吞吐量优化
  • 电子工程师-高质量工具包
  • 简单认识redis - 12 redis锁
  • 基于springboot+vue车辆充电桩管理系统
  • shodan用法(完)
  • 【若依框架】代码生成详细教程,15分钟搭建Springboot+Vue3前后端分离项目,基于Mysql8数据库和Redis5,管理后台前端基于Vue3和Element Plus,开发小程序数据后台
  • 转子侧串级调速系统和双馈调速系统
  • AI学习指南自然语言处理篇-Transformer模型的实践
  • 【LVGL速成】LVGL修改标签文本(GUI Guider生成的字库问题)
  • C语言项目实践-贪吃蛇
  • 在kanzi 3.9.8里使用API创建自定义材质
  • IDEA中通义灵码的使用技巧
  • JS中let var 和const区别
  • ansible详细介绍和具体步骤
  • 利用LangChain与LLM打造个性化私有文档搜索系统
  • linux中的软、硬链接
  • Ubuntu 系统、Docker配置、Docker的常用软件配置(下)
  • jdk,openjdk,oraclejdk
  • Docker Hub 镜像加速器
  • DevOps赋能:优化业务价值流的实战策略与路径(上)
  • int的取值范围