当前位置: 首页 > news >正文

大数据之Flume

Flume

Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。Flume基于流式架构,灵活简单。

在这里插入图片描述

架构图

在这里插入图片描述

一、Flume基础架构

1.1 Agent

Agent是一个JVM进程,它以事件的形式将数据从源头送至目的。

Agent主要有3个部分组成,Source、Channel、Sink。

1.2 Source

Source是负责接收数据到Flume Agent的组件。Source组件可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、 taildir 、sequence generator、syslog、http、legacy。

1.3 Sink

Sink不断地轮询Channel中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个Flume Agent。

Sink组件目的地包括hdfs、logger、avro、thrift、ipc、file、HBase、solr、自定义。

1.4 Channel

Channel是位于Source和Sink之间的缓冲区。因此,Channel允许Source和Sink运作在不同的速率上。Channel是线程安全的,可以同时处理几个Source的写入操作和几个Sink的读取操作。

Flume自带两种Channel:Memory Channel和File Channel。

Memory Channel是内存中的队列。Memory Channel在不需要关心数据丢失的情景下适用。如果需要关心数据丢失,那么Memory Channel就不应该使用,因为程序死亡、机器宕机或者重启都会导致数据丢失。

File Channel将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。

1.5 Event

传输单元,Flume数据传输的基本单元,以Event的形式将数据从源头送至目的地。Event由**HeaderBody**两部分组成,Header用来存放该event的一些属性,为K-V结构,Body用来存放该条数据,形式为字节数组。

在这里插入图片描述

二、Flume安装

2.1 安装地址

  • Flume官网地址:http://flume.apache.org/
  • 文档查看地址:http://flume.apache.org/FlumeUserGuide.html
  • 下载地址:http://archive.apache.org/dist/flume/

2.2 安装部署

  1. 将apache-flume-1.9.0-bin.tar.gz上传到linux的/opt/software目录下

  2. 解压apache-flume-1.9.0-bin.tar.gz到/opt/module/目录下

    tar -zxvf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/
    
  3. 修改apache-flume-1.9.0-bin的名称为flume

    mv /opt/module/apache-flume-1.9.0-bin /opt/module/flume-1.9.0
    
  4. 将lib文件夹下的guava-11.0.2.jar删除以兼容Hadoop 3.1.3

    rm /opt/module/flume-1.9.0/lib/guava-11.0.2.jar
    
  5. 环境变量

    [root@hadoop102 flume-1.9.0]# vim /etc/profile.d/my_env.sh---
    FLUME_HOME=/opt/module/flume-1.9.0
    PATH=$PATH:$FLUME_HOME/binexport FLUME_HOME
    ---
    [atguigu@hadoop102 flume-1.9.0]$ source /etc/profile.d/my_env.sh
    

三、Flume入门案例

3.1 监控端口数据官方案例

需求:使用Flume监听一个端口,收集该端口数据,并打印到控制台

在这里插入图片描述

https://flume.apache.org/releases/content/1.11.0/FlumeUserGuide.html#a-simple-example

实现步骤

[atguigu@hadoop102 ~]$ cd /opt/module/flume-1.9.0/
[atguigu@hadoop102 flume-1.9.0]$ mkdir data && cd data
[atguigu@hadoop102 data]$ vim netcat-flume-logger.conf
---
# 给Agent各个组件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1# 配置Source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444# 配置Channel 
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# 配置Sink
a1.sinks.k1.type = logger# 组合
a1.sources.r1.channels = c1
a1.sources.k1.channel = c1# ChannelSelector Sink Processor Interceptor
---# flume-ng 是flume bin目录下的命令, 我们已配环境变量
# --conf/-c 表示配置文件存储在conf目录下, 指定配置文件的位置, 此参数缩写是-c
# --conf-file/-f 表示flume本次启动读取的配置文件是data文件夹下的netcat-flume-logger.conf
# --name/-n 表示给agent起名为a1 (对应netcat-flume-logger.conf文件里的a1, 因为我们都是a1.sources、a1.sinks等等)
# -Dflume.root.logger=INFO,console: -D表示flume运行时动态修改flume.root.logger参数属性值,将info级别的日志打到console上
[atguigu@hadoop102 flume-1.9.0]$ flume-ng agent --conf conf --conf-file data/netcat-flume-logger.conf --name a1 -Dflume.root.logger=INFO,console
# 可以看到Flume启动在44444

在这里插入图片描述
6%A1%88%E4%BE%8B-Flume%E5%90%AF%E5%8A%A8%E5%9B%BE.png&pos_id=img-GYuqX8A0-1754486100991)

# 另开窗口使用nc连上44444
[atguigu@hadoop102 ~]$ nc localhost 44444
hello
nihao
111
222
333
byte
byebye

在这里插入图片描述

3.2 实时监控单个追加文件

需求:实时监控Hive日志,并上传到HDFS中

在这里插入图片描述

实现步骤

[atguigu@hadoop102 data]$ vim exec-flume-logger.conf
---
# 给Agent各个组件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive-3.1.2/logs/hive.log# Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Sink
a1.sinks.k1.type = logger# Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
---[atguigu@hadoop102 flume-1.9.0]$ flume-ng agent -c conf -f data/exec-flume-logger.conf -n a1 -Dflume.root.logger=INFO,console

在这里插入图片描述

测试手动追加

[atguigu@hadoop102 logs]$ echo aaa >> hive.log 
[atguigu@hadoop102 logs]$ echo bbb >> hive.log 
[atguigu@hadoop102 logs]$ 

测试自动追加

[atguigu@hadoop102 ~]$ beeline -u jdbc:hive2://hadoop102:10000 -n atguigu
0: jdbc:hive2://hadoop102:10000> use default
# 无论是正常日志还是错误日志都会被flume采集到,这样就实现了从文件里采集追加的数据到我们的控制台。
# flume命令后跟的Info只是说Flume运行日志的Info级别而非监控的文件(hive.log)里数据, Hive的Error打印了,是因为Hive的日志级别导致Error打印到hive.log
# 从而被flume采集, 只要数据到hive.log flume就采集, hive.log里有啥和flume没关系

接下来实现上传到HDFS

[atguigu@hadoop102 data]$ cp exec-flume-logger.conf exec-flume-hdfs.conf
# 改一下sink
[atguigu@hadoop102 data]$ vim exec-flume-hdfs.conf
---
# Sink
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H
#上传文件的前缀
a1.sinks.k2.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a1.sinks.k2.hdfs.round = true
#多少时间单位创建一个新的文件夹
a1.sinks.k2.hdfs.roundValue = 1
#重新定义时间单位
a1.sinks.k2.hdfs.roundUnit = hour
#是否使用本地时间戳
a1.sinks.k2.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a1.sinks.k2.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a1.sinks.k2.hdfs.fileType = DataStream
#多久生成一个新的文件
a1.sinks.k2.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a1.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a1.sinks.k2.hdfs.rollCount = 0
---
[atguigu@hadoop102 flume-1.9.0]$ flume-ng agent -c conf -f data/exec-flume-hdfs.conf -n a1

在这里插入图片描述

正在使用的文件后缀为.tmp,可改

注意

对于所有与时间相关的转义序列,Event Header中必须存在以 “timestamp”的key(除非hdfs.useLocalTimeStamp设置为true,此方法会使用TimestampInterceptor自动添加timestamp)。

  • type: 组件类型名称,hdfs

  • hdfs.path: 捡来把数据写到HDFS的什么位置

    采集数据后,把数据写到HDFS某个位置,我是一直往某个文件里写吗?还是一个Event写一个文件呢?

    如果一直往一个文件里写,这个文件就会越来越大,不合适;一个Event一个文件呢,这又会造成很多小文件的场景。

  • hdfs.filePrefix、hdfs.fileSuffix: 将来你在HDFS创建目录的时候,你可以指定一个文件的前缀、后缀标识

  • hdfs.inUsePrefix、hdfs.inUseSuffix: 将来给你正在使用的文件加前缀、后缀标识,inUseSuffix默认是.tmp

  • hdfs.rollInterval: 滚动时间间隔。往HDFS写数据的时候,多少秒给你生成一个文件,例如30秒一个文件。时间层面

  • hdfs.rollSize: 文件达到多大的时候触发滚动,例如当文件达到1024 byte的时候去触发滚动。大小层面

  • hdfs.rollCount: 写进去多少个时间的时候触发滚动,例如写了10个Event的时候触发滚动。一般不用。

    一条数据封装为一个Event,这个大小不好衡量,通常情况下1~2KB。但特殊情况Event大小不确定,就不能通过事件的个数去断定数据有多少。所以一般根据时间层面或大小层面去进行滚动。例如让他128M滚动一次。

  • hdfs.round、hdfs.roundValue、hdfs.roundUnit: 这个是对于文件夹的滚动,它可以不用一直往一个目录下写文件,可以往多个目录下写文件。round是bool,是否滚动,roundValue是值,roundUnit是单位,例如roundValue是1,roundUnit是second,那就是1秒钟滚动一次。

  • hdfs.useLocalTimeStamp: 本地时间戳,经常用,因为滚动涉及到时间层面

3.3 实时监控目录下多个文件

需求:使用Flume监听整个目录的文件,并上传至HDFS

在这里插入图片描述

[atguigu@hadoop102 data]$ cp exec-flume-hdfs.conf spooling-flume-hdfs.conf
# 改一下source
[atguigu@hadoop102 data]$ vim spooling-flume-hdfs.conf
---
# Source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /opt/module/flume-1.9.0/data/spoolingDir
a1.sources.r1.fileSuffix = .COMPLETED
#忽略所有以.tmp结尾的文件,不上传
a1.sources.r1.ignorePattern = ([^ ]*\.tmp)
---[atguigu@hadoop102 data]$ mkdir spoolingDir && cd spoolingDir
[atguigu@hadoop102 flume-1.9.0]$ flume-ng agent -c conf -f data/spooling-flume-hdfs.conf -n a1
[atguigu@hadoop102 data]$ cp exec-flume-hdfs.conf ./spoolingDir/
[atguigu@hadoop102 data]$ ls spoolingDir/
exec-flume-hdfs.conf.COMPLETED

说明:在使用Spooling Directory Source时,不要在监控目录中创建并持续修改文件;上传完成的文件会以.COMPLETED结尾;被监控文件夹每500毫秒扫描一次文件变动。完全以后缀区分,如果直接创建.COMPLETED后缀的文件,它不会再去采集。如果a文件采集过了变为了a.COMPLETED,再创建一个a,那么会一直采集,采集报错,其他文件采集不了了,重启也会去采集报错。只能去把a文件删除才能修复。

3.4 实时监控目录下的多个追加文件

Exec source适用于监控一个实时追加的文件,不能实现断点续传;Spooldir Source适合用于同步新文件,但不适合对实时追加日志的文件进行监听并同步;而Taildir Source适合用于监听多个实时追加的文件,并且能够实现断点续传。

需求:使用Flume监听整个目录的实时追加文件,并上传至HDFS

在这里插入图片描述

实现步骤

[atguigu@hadoop102 data]$ cp exec-flume-hdfs.conf taildir-flume-hdfs.conf
# 修改source配置
[atguigu@hadoop102 data]$ vim taildir-flume-hdfs.conf
---
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/module/flume-1.9.0/data/position/position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /opt/module/flume-1.9.0/data/taildir/file.*
a1.sources.r1.filegroups.f2 = /opt/module/flume-1.9.0/data/taildir/log.txt
---
[atguigu@hadoop102 data]$ mkdir position && cd position
[atguigu@hadoop102 position]$ touch position.json
[atguigu@hadoop102 position]$ cd .. && mkdir taildir && cd taildir
[atguigu@hadoop102 taildir]$ touch file1.txt
[atguigu@hadoop102 taildir
http://www.lryc.cn/news/611857.html

相关文章:

  • AT32的freertos下modbus TCP移植
  • #C语言——学习攻略:探索内存函数--memcpy、memmove的使用和模拟实现,memset、memcmp函数的使用
  • flex布局:容器的justify-content属性
  • CEH、OSCP、CISP、CISSP 四大网络安全认证攻略
  • 【hot100】无重复字符的最长子串-Python3
  • duiLib 编译时复制资源目录到exe同级目录
  • 推动本地流智能:基于 Apache Kafka 与 Flink 的实时机器学习实践
  • 无需SCADA/OPC,实现直接与西门子PLC Web API通讯实现数据读写(一)
  • Mysql如何迁移数据库数据
  • 【自动驾驶】《Sparse4Dv3 Advancing End-to-End 3D Detection and Tracking》论文阅读笔记
  • 工业协议转换终极武器:EtherCAT转PROFINET网关的连接举例
  • Spring Boot全局异常处理与日志监控实战指南
  • 从Navisworks到定制化BIM系统:HOOPS Exchange如何实现高效3D格式解析?
  • 【公考】----申论篇
  • 测试单节点elasticsearch配置存储压缩后的比率
  • 20250806给PRO-RK3566开发板在Buildroot系统下扩大rootfs分区2GB
  • 移动端网页调试实战,跨设备兼容与触控交互问题排查全流程
  • Class30数据增广
  • 【docker】完整 Dockerfile 示例和构建运行指南
  • SmartX 用户建云实践|宝信软件:搭建“双架构”私有云平台,灵活满足多种业务需求
  • Bug 记录:SecureRandom.getInstanceStrong()导致验证码获取阻塞
  • Python爬虫 urllib 模块详细教程:零基础小白的入门指南
  • Unity3D水下场景与游泳系统开发指南
  • Scrapy(一):轻松爬取图片网站内容​
  • 安宝特方案丨工业AR+AI质检方案:致力于提升检测精度与流程效率
  • linux-系统性能监控
  • Python爬虫实战:研究spiderfoot工具,构建网络情报收集系统
  • python每日一题 贪心算法
  • 线程-线程池篇(二)
  • 基于Hadoop的木鸟民宿数据分析与可视化、民宿价格预测模型系统的设计与实现