当前位置: 首页 > news >正文

二百零七、Flume——Flume实时采集5分钟频率的Kafka数据直接写入ODS层表的HDFS文件路径下

一、目的

在离线数仓中,需要用Flume去采集Kafka中的数据,然后写入HDFS中。

由于每种数据类型的频率、数据大小、数据规模不同,因此每种数据的采集需要不同的Flume配置文件。玩了几天Flume,感觉Flume的使用难点就是配置文件

二、使用场景

转向比数据是数据频率为5分钟的数据类型代表,数据量很小、频率不高,因此搞定了转向比数据的采集就搞定了这一类低频率数据的实时采集问题

1台设备每日的转向比数据规模是30KB,25台设备的数据规模则是750KB

三、转向比数据ODS层建表

create external table  if not exists  ods_turnratio(turnratio_json  string
)
comment '转向比数据外部表——静态分区'
partitioned by (day string)
row format delimited fields terminated by '\x001'
lines terminated by '\n'
stored as SequenceFile
tblproperties("skip.header.line.count"="1");

四、转向比数据的配置文件

## agent a1
a1.sources = s1
a1.channels = c1
a1.sinks = k1

## configure source s1
a1.sources.s1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.s1.kafka.bootstrap.servers = 192.168.0.27:9092
a1.sources.s1.kafka.topics = topic_b_turnratio
a1.sources.s1.kafka.consumer.group.id = turnratio_group
a1.sources.s1.kafka.consumer.auto.offset.reset = latest
a1.sources.s1.batchSize = 1000

## configure channel c1
## a1.channels.c1.type = memory
## a1.channels.c1.capacity = 10000
## a1.channels.c1.transactionCapacity = 1000
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /home/data/flumeData/checkpoint/turnratio
a1.channels.c1.dataDirs = /home/data/flumeData/flumedata/turnratio

## configure sink k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hurys23:8020/user/hive/warehouse/hurys_dc_ods.db/ods_turnratio/day=%Y-%m-%d/
a1.sinks.k1.hdfs.filePrefix = turnratio
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = second
a1.sinks.k1.hdfs.rollSize = 62500
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.idleTimeout = 600
a1.sinks.k1.hdfs.minBlockReplicas = 1

## Bind the source and sink to the channel
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

注意:62500约为61KB

五、Flume写入HDFS结果

Flume根据时间戳按照ODS层表的分区,将数据写入对应HDFS文件

25台设备,50分钟1个文件,文件大小66.18 KB 

六、ODS表刷新分区后查验数据

(一)刷新表分区

MSCK REPAIR TABLE ods_turnratio;

(二)查看表数据

select * from ods_turnratio;

(三)验证数据完整性

--2023-11-19 数据基本完整  23时297条 标准300  少3条
--2023-11-20 数据基本完整  23时299条 标准300  少1条

数据基本完整,尤其是调度文件大小之后

19日a1.sinks.k1.hdfs.rollSize = 31250        数据基本完整 23时297条 标准300 少3条

20日a1.sinks.k1.hdfs.rollSize = 62500        数据基本完整 23时299条 标准300 少1条

七、注意点

(一)配置文件中的重点是红色标记的几点

a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = second
a1.sinks.k1.hdfs.rollSize = 62500
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.idleTimeout = 600
a1.sinks.k1.hdfs.minBlockReplicas = 1

(二)任务配置文件中rollSize参数设置可大不可小

rollSize参数小的话数据会丢失,大的话没问题

配置文件的参数还是不断调试中,争取调到最优的状态。能够及时、完整的消费Kafka数据,并且能够最大化的利用HDFS资源。

目前就先这样,如果有问题的话后面再更新!!!

http://www.lryc.cn/news/240425.html

相关文章:

  • 【实验】配置用户自动获取IPv6地址的案例
  • 手撕A*算法(详解A*算法)
  • 1688API如何获取商品详情信息(关键词搜索商品列表),1688API接口开发系列
  • 〖大前端 - 基础入门三大核心之JS篇㊶〗- DOM事件传播和事件监听方法addEventListener()
  • Cartographer实现双雷达建图
  • (离散数学)主析取范式
  • Communications link failure
  • XC3320 离线式、无电感交流输入线性稳压器 可替代KP3310 固定5V输出电压
  • 导购APP、淘客查券机器人与淘客系统:全面对比与选择
  • 飞翔的鸟游戏
  • 【SpringCloud】为什么选择微服务?
  • 基于Python实现汽车销售数据可视化+预测【500010086.1】
  • 干货分享:好用的两款封面制作工具
  • 模版模式 设计模式
  • MySQL锁机制
  • webpack loader
  • Java—学生信息管理系统(简单、详细)
  • Spring第一课,了解IDEA里面的文件,回顾Cookie和Session,获取Session,Cookie,Header的方式
  • AcWing113.特殊排序
  • 数据仓库岗面试
  • 企业建数仓的第一步是选择一个好用的ETL工具
  • 行情分析 - - 加密货币市场大盘走势(11.23)
  • 穿山甲SDK 集成·android接入广告·app流量变现
  • 深度学习模型训练计算量的估算
  • 【实验笔记】C语言实验——降价提醒机器人
  • YOLOv5分割训练,从数据集标注到训练一条龙解决
  • 再添千万级罚单,某银行年内罚款过亿!金融行业合规问题亟待解决
  • 配置Nginx服务器用于Web应用代理和SSL{仅配置文件}
  • 【广州华锐互动】VR溺水预防教育:在虚拟世界中学会自救!
  • Si(111)衬底上脉冲激光沉积AlN外延薄膜的界面反应控制及其机理