大数据之Flume
Flume
Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。Flume基于流式架构,灵活简单。
架构图
一、Flume基础架构
1.1 Agent
Agent是一个JVM进程,它以事件的形式将数据从源头送至目的。
Agent主要有3个部分组成,Source、Channel、Sink。
1.2 Source
Source是负责接收数据到Flume Agent的组件。Source组件可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、 taildir 、sequence generator、syslog、http、legacy。
1.3 Sink
Sink不断地轮询Channel中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个Flume Agent。
Sink组件目的地包括hdfs、logger、avro、thrift、ipc、file、HBase、solr、自定义。
1.4 Channel
Channel是位于Source和Sink之间的缓冲区。因此,Channel允许Source和Sink运作在不同的速率上。Channel是线程安全的,可以同时处理几个Source的写入操作和几个Sink的读取操作。
Flume自带两种Channel:Memory Channel和File Channel。
Memory Channel是内存中的队列。Memory Channel在不需要关心数据丢失的情景下适用。如果需要关心数据丢失,那么Memory Channel就不应该使用,因为程序死亡、机器宕机或者重启都会导致数据丢失。
File Channel将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。
1.5 Event
传输单元,Flume数据传输的基本单元,以Event的形式将数据从源头送至目的地。Event由**Header和Body**两部分组成,Header用来存放该event的一些属性,为K-V结构,Body用来存放该条数据,形式为字节数组。
二、Flume安装
2.1 安装地址
- Flume官网地址:http://flume.apache.org/
- 文档查看地址:http://flume.apache.org/FlumeUserGuide.html
- 下载地址:http://archive.apache.org/dist/flume/
2.2 安装部署
-
将apache-flume-1.9.0-bin.tar.gz上传到linux的/opt/software目录下
-
解压apache-flume-1.9.0-bin.tar.gz到/opt/module/目录下
tar -zxvf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/
-
修改apache-flume-1.9.0-bin的名称为flume
mv /opt/module/apache-flume-1.9.0-bin /opt/module/flume-1.9.0
-
将lib文件夹下的guava-11.0.2.jar删除以兼容Hadoop 3.1.3
rm /opt/module/flume-1.9.0/lib/guava-11.0.2.jar
-
环境变量
[root@hadoop102 flume-1.9.0]# vim /etc/profile.d/my_env.sh--- FLUME_HOME=/opt/module/flume-1.9.0 PATH=$PATH:$FLUME_HOME/binexport FLUME_HOME --- [atguigu@hadoop102 flume-1.9.0]$ source /etc/profile.d/my_env.sh
三、Flume入门案例
3.1 监控端口数据官方案例
需求:使用Flume监听一个端口,收集该端口数据,并打印到控制台
https://flume.apache.org/releases/content/1.11.0/FlumeUserGuide.html#a-simple-example
实现步骤
[atguigu@hadoop102 ~]$ cd /opt/module/flume-1.9.0/
[atguigu@hadoop102 flume-1.9.0]$ mkdir data && cd data
[atguigu@hadoop102 data]$ vim netcat-flume-logger.conf
---
# 给Agent各个组件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1# 配置Source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444# 配置Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# 配置Sink
a1.sinks.k1.type = logger# 组合
a1.sources.r1.channels = c1
a1.sources.k1.channel = c1# ChannelSelector Sink Processor Interceptor
---# flume-ng 是flume bin目录下的命令, 我们已配环境变量
# --conf/-c 表示配置文件存储在conf目录下, 指定配置文件的位置, 此参数缩写是-c
# --conf-file/-f 表示flume本次启动读取的配置文件是data文件夹下的netcat-flume-logger.conf
# --name/-n 表示给agent起名为a1 (对应netcat-flume-logger.conf文件里的a1, 因为我们都是a1.sources、a1.sinks等等)
# -Dflume.root.logger=INFO,console: -D表示flume运行时动态修改flume.root.logger参数属性值,将info级别的日志打到console上
[atguigu@hadoop102 flume-1.9.0]$ flume-ng agent --conf conf --conf-file data/netcat-flume-logger.conf --name a1 -Dflume.root.logger=INFO,console
# 可以看到Flume启动在44444
6%A1%88%E4%BE%8B-Flume%E5%90%AF%E5%8A%A8%E5%9B%BE.png&pos_id=img-GYuqX8A0-1754486100991)
# 另开窗口使用nc连上44444
[atguigu@hadoop102 ~]$ nc localhost 44444
hello
nihao
111
222
333
byte
byebye
3.2 实时监控单个追加文件
需求:实时监控Hive日志,并上传到HDFS中
实现步骤
[atguigu@hadoop102 data]$ vim exec-flume-logger.conf
---
# 给Agent各个组件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive-3.1.2/logs/hive.log# Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Sink
a1.sinks.k1.type = logger# Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
---[atguigu@hadoop102 flume-1.9.0]$ flume-ng agent -c conf -f data/exec-flume-logger.conf -n a1 -Dflume.root.logger=INFO,console
测试手动追加
[atguigu@hadoop102 logs]$ echo aaa >> hive.log
[atguigu@hadoop102 logs]$ echo bbb >> hive.log
[atguigu@hadoop102 logs]$
测试自动追加
[atguigu@hadoop102 ~]$ beeline -u jdbc:hive2://hadoop102:10000 -n atguigu
0: jdbc:hive2://hadoop102:10000> use default
# 无论是正常日志还是错误日志都会被flume采集到,这样就实现了从文件里采集追加的数据到我们的控制台。
# flume命令后跟的Info只是说Flume运行日志的Info级别而非监控的文件(hive.log)里数据, Hive的Error打印了,是因为Hive的日志级别导致Error打印到hive.log
# 从而被flume采集, 只要数据到hive.log flume就采集, hive.log里有啥和flume没关系
接下来实现上传到HDFS
[atguigu@hadoop102 data]$ cp exec-flume-logger.conf exec-flume-hdfs.conf
# 改一下sink
[atguigu@hadoop102 data]$ vim exec-flume-hdfs.conf
---
# Sink
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H
#上传文件的前缀
a1.sinks.k2.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a1.sinks.k2.hdfs.round = true
#多少时间单位创建一个新的文件夹
a1.sinks.k2.hdfs.roundValue = 1
#重新定义时间单位
a1.sinks.k2.hdfs.roundUnit = hour
#是否使用本地时间戳
a1.sinks.k2.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a1.sinks.k2.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a1.sinks.k2.hdfs.fileType = DataStream
#多久生成一个新的文件
a1.sinks.k2.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a1.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a1.sinks.k2.hdfs.rollCount = 0
---
[atguigu@hadoop102 flume-1.9.0]$ flume-ng agent -c conf -f data/exec-flume-hdfs.conf -n a1
正在使用的文件后缀为.tmp,可改
注意:
对于所有与时间相关的转义序列,Event Header中必须存在以 “timestamp”的key(除非hdfs.useLocalTimeStamp设置为true,此方法会使用TimestampInterceptor自动添加timestamp)。
-
type: 组件类型名称,hdfs
-
hdfs.path: 捡来把数据写到HDFS的什么位置
采集数据后,把数据写到HDFS某个位置,我是一直往某个文件里写吗?还是一个Event写一个文件呢?
如果一直往一个文件里写,这个文件就会越来越大,不合适;一个Event一个文件呢,这又会造成很多小文件的场景。
-
hdfs.filePrefix、hdfs.fileSuffix: 将来你在HDFS创建目录的时候,你可以指定一个文件的前缀、后缀标识
-
hdfs.inUsePrefix、hdfs.inUseSuffix: 将来给你正在使用的文件加前缀、后缀标识,inUseSuffix默认是.tmp
-
hdfs.rollInterval: 滚动时间间隔。往HDFS写数据的时候,多少秒给你生成一个文件,例如30秒一个文件。时间层面
-
hdfs.rollSize: 文件达到多大的时候触发滚动,例如当文件达到1024 byte的时候去触发滚动。大小层面
-
hdfs.rollCount: 写进去多少个时间的时候触发滚动,例如写了10个Event的时候触发滚动。一般不用。
一条数据封装为一个Event,这个大小不好衡量,通常情况下1~2KB。但特殊情况Event大小不确定,就不能通过事件的个数去断定数据有多少。所以一般根据时间层面或大小层面去进行滚动。例如让他128M滚动一次。
-
hdfs.round、hdfs.roundValue、hdfs.roundUnit: 这个是对于文件夹的滚动,它可以不用一直往一个目录下写文件,可以往多个目录下写文件。round是bool,是否滚动,roundValue是值,roundUnit是单位,例如roundValue是1,roundUnit是second,那就是1秒钟滚动一次。
-
hdfs.useLocalTimeStamp: 本地时间戳,经常用,因为滚动涉及到时间层面
3.3 实时监控目录下多个文件
需求:使用Flume监听整个目录的文件,并上传至HDFS
[atguigu@hadoop102 data]$ cp exec-flume-hdfs.conf spooling-flume-hdfs.conf
# 改一下source
[atguigu@hadoop102 data]$ vim spooling-flume-hdfs.conf
---
# Source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /opt/module/flume-1.9.0/data/spoolingDir
a1.sources.r1.fileSuffix = .COMPLETED
#忽略所有以.tmp结尾的文件,不上传
a1.sources.r1.ignorePattern = ([^ ]*\.tmp)
---[atguigu@hadoop102 data]$ mkdir spoolingDir && cd spoolingDir
[atguigu@hadoop102 flume-1.9.0]$ flume-ng agent -c conf -f data/spooling-flume-hdfs.conf -n a1
[atguigu@hadoop102 data]$ cp exec-flume-hdfs.conf ./spoolingDir/
[atguigu@hadoop102 data]$ ls spoolingDir/
exec-flume-hdfs.conf.COMPLETED
说明:在使用Spooling Directory Source时,不要在监控目录中创建并持续修改文件;上传完成的文件会以.COMPLETED结尾;被监控文件夹每500毫秒扫描一次文件变动。完全以后缀区分,如果直接创建.COMPLETED后缀的文件,它不会再去采集。如果a文件采集过了变为了a.COMPLETED,再创建一个a,那么会一直采集,采集报错,其他文件采集不了了,重启也会去采集报错。只能去把a文件删除才能修复。
3.4 实时监控目录下的多个追加文件
Exec source适用于监控一个实时追加的文件,不能实现断点续传;Spooldir Source适合用于同步新文件,但不适合对实时追加日志的文件进行监听并同步;而Taildir Source适合用于监听多个实时追加的文件,并且能够实现断点续传。
需求:使用Flume监听整个目录的实时追加文件,并上传至HDFS
实现步骤
[atguigu@hadoop102 data]$ cp exec-flume-hdfs.conf taildir-flume-hdfs.conf
# 修改source配置
[atguigu@hadoop102 data]$ vim taildir-flume-hdfs.conf
---
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/module/flume-1.9.0/data/position/position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /opt/module/flume-1.9.0/data/taildir/file.*
a1.sources.r1.filegroups.f2 = /opt/module/flume-1.9.0/data/taildir/log.txt
---
[atguigu@hadoop102 data]$ mkdir position && cd position
[atguigu@hadoop102 position]$ touch position.json
[atguigu@hadoop102 position]$ cd .. && mkdir taildir && cd taildir
[atguigu@hadoop102 taildir]$ touch file1.txt
[atguigu@hadoop102 taildir