当前位置: 首页 > news >正文

大数据课程E7——Flume的Interceptor

文章作者邮箱:yugongshiye@sina.cn              地址:广东惠州

 ▲ 本章节目的

⚪ 了解Interceptor的概念和配置参数;

⚪ 掌握Interceptor的使用方法;

⚪ 掌握Interceptor的Host Interceptor;

⚪ 掌握Interceptor的Static Interceptor;

⚪ 掌握Interceptor的UUID Interceptor;

⚪ 掌握Interceptor的Search And Replace Interceptor;

⚪ 掌握Interceptor的Regex Filtering Interceptor;

⚪ 掌握Interceptor的Custom Interceptor;

一、Timestamp Interceptor

1. 概述

1. Timestamp Interceptor是在headers中来添加一个timestamp字段来标记数据被收集的时间。

2. Timestamp Interceptor结合HDFS Sink可以实现数据按天存储。

2. 配置属性

属性

解释

type

timestamp

3. 案例

1. 编写格式文件,添加如下内容:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = netcat

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

# 给Interceptor起名

a1.sources.s1.interceptors = i1

# 指定Timestamp Interceptor

a1.sources.s1.interceptors.i1.type = timestamp

a1.channels.c1.type = memory

a1.sinks.k1.type = logger

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

2. 启动Flume:

../bin/flume-ng agent -n a1 -c ../conf -f in.conf -

Dflume.root.logger=INFO,console

4. 数据按天存放

1. 编写格式文件,添加如下内容:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = netcat

a1.sources.s1.bind = hadoop01

a1.sources.s1.port = 8090

a1.sources.s1.interceptors = i1

a1.sources.s1.interceptors.i1.type = timestamp

a1.channels.c1.type = memory

a1.sinks.k1.type = hdfs

a1.sinks.k1.hdfs.path = hdfs://hadoop01:9000/flumedata/date=%Y-%m-%d

a1.sinks.k1.hdfs.fileType = DataStream

a1.sinks.k1.hdfs.rollInterval = 3600

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

2. 启动Flume:

../bin/flume-ng agent -n a1 -c ../conf -f hdfsin.conf -

Dflume.root.logger=INFO,console

二、Host Interceptor

1. 概述

1. Host Interceptor是在headers中添加一个字段host。

2. Host Interceptor可以用于标记数据来源于哪一台主机。

2. 配置属性

属性

解释

type

必须是host

3. 案例

1. 编写格式文件,添加如下内容:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = netcat

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

# 给Interceptor起名

a1.sources.s1.interceptors = i1 i2

# 指定Timestamp Interceptor

a1.sources.s1.interceptors.i1.type = timestamp

# 指定Host Interceptor

a1.sources.s1.interceptors.i2.type = host

a1.channels.c1.type = memory

a1.sinks.k1.type = logger

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

2. 启动Flume:

../bin/flume-ng agent -n a1 -c ../conf -f in.conf -

Dflume.root.logger=INFO,console

三、Static Interceptor

1. 概述

1. Static Interceptor是在headers中添加指定字段。

2. 可以利用这个Interceptor来标记数据的类型。

2. 配置属性

属性

解释

type

必须是static

key

指定在headers中的字段名

value

指定在headers中的字段值

3. 案例

1. 编写格式文件,添加如下内容:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = netcat

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

# 给Interceptor起名

a1.sources.s1.interceptors = i1 i2 i3

# 指定Timestamp Interceptor

a1.sources.s1.interceptors.i1.type = timestamp

# 指定Host Interceptor

a1.sources.s1.interceptors.i2.type = host

# 指定Static Interceptor

a1.sources.s1.interceptors.i3.type = static

a1.sources.s1.interceptors.i3.key = kind

a1.sources.s1.interceptors.i3.value = log

a1.channels.c1.type = memory

a1.sinks.k1.type = logger

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

2. 启动Flume:

../bin/flume-ng agent -n a1 -c ../conf -f in.conf -

Dflume.root.logger=INFO,console

四、UUID Interceptor

1. 概述

1. UUID Interceptor是在headers中添加一个id字段。

2. 可以用于标记数据的唯一性。

2. 配置属性

属性

解释

type

必须是org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder

3. 案例

1. 编写格式文件,添加如下内容:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = netcat

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

# 给Interceptor起名

a1.sources.s1.interceptors = i1 i2 i3 i4

# 指定Timestamp Interceptor

a1.sources.s1.interceptors.i1.type = timestamp

# 指定Host Interceptor

a1.sources.s1.interceptors.i2.type = host

# 指定Static Interceptor

a1.sources.s1.interceptors.i3.type = static

a1.sources.s1.interceptors.i3.key = kind

a1.sources.s1.interceptors.i3.value = log

# 指定UUID Interceptor

a1.sources.s1.interceptors.i4.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder

a1.channels.c1.type = memory

a1.sinks.k1.type = logger

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

2. 启动Flume:

../bin/flume-ng agent -n a1 -c ../conf -f in.conf -

Dflume.root.logger=INFO,console

五、Search And Replace Interceptor

1. 概述

1. Search And Replace Interceptor在使用的时候,需要指定正则表达式,会根据正则表达式的规则,将符合正则表达式的数据替换为指定形式的数据。

2. 在替换的时候,不会替换headers中的数据,而是会替换body中的数据。

2. 配置属性

属性

解释

type

必须是search_replace

searchPattern

指定要匹配的正则形式

replaceString

指定要替换的字符串

3. 案例

1. 编写格式文件,添加如下内容:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = http

a1.sources.s1.port = 8090

# 给拦截器起名

a1.sources.s1.interceptors = i1

# 指定类型

a1.sources.s1.interceptors.i1.type = search_replace

a1.sources.s1.interceptors.i1.searchPattern = [0-9]

a1.sources.s1.interceptors.i1.replaceString = *

a1.channels.c1.type = memory

a1.sinks.k1.type = logger

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

2. 启动Flume:

../bin/flume-ng agent -n a1 -c ../conf -f searchin.conf -

Dflume.root.logger=INFO,console

六、Regex Filtering Interceptor

1. 概述

1. Regex Filtering Interceptor在使用的时候需要指定正则表达式。

2. 属性excludeEvents的值如果不指定,默认是false。

3. 如果没有配置excludeEvents的值或者配置excludeEvents的值配置为false,则只有符合正则表达式的数据会留下来,其他不符合正则表达式的数据会被过滤掉;如果excludeEvents的值,那么符合正则表达式的数据会被过滤掉,其他的数据则会被留下来。

2. 配置属性

属性

解释

type

必须是regex_filter

regex

指定正则表达式

excludeEvents

true或者false

3. 案例

1. 编写格式文件,添加如下内容:

# 定义 数据源(输入端) 缓冲区 输出源(输出端)

a1.sources = r1

a1.channels = c1

a1.sinks = k1

# 输入端

a1.sources.r1.type = spooldir

a1.sources.r1.spoolDir = /opt/upload

a1.sources.r1.fileSuffix = .done

# 拦截器

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type = regex_filter

#全部都是符合条件的数据

a1.sources.r1.interceptors.i1.regex = ^.*INFO.*$

#排除符合正则表达式的数据

# a1.sources.r1.interceptors.i1.excludeEvents = true

# 输出端

a1.sinks.k1.type = hdfs

a1.sinks.k1.hdfs.path = hdfs://flume45:9000/interceptors/%Y%m%d/%H

#是否使用本地时间戳

a1.sinks.k1.hdfs.useLocalTimeStamp = true

# 序列化

a1.sinks.k1.hdfs.fileType = DataStream

a1.sinks.k1.hdfs.rollInterval = 0

# 使用一个在内存中缓冲事件的通道

a1.channels.c1.type = memory

# 连接通道

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

2. 启动Flume:

../bin/flume-ng agent -n a1 -c ../conf -f regexin.conf -

Dflume.root.logger=INFO,console

七、Custom Interceptor

1. 概述

1. 在Flume中,也允许自定义拦截器。但是不同于其他组件,自定义Interceptor的时候,需要再额外覆盖其中的内部接口。

2. 步骤:

a. 构建Maven工程,导入对应的依赖。

b. 自定义一个类实现Interceptor接口,覆盖其中initialize,intercept和close方法。

c. 定义静态内部类,实现Interceptor.Builder内部接口。

d. 打成jar包方法Flume安装目录的lib目录下。

e. 编写格式文件,添加如下内容:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = netcat

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

# 指定拦截器

a1.sources.s1.interceptors = i1

a1.sources.s1.interceptors.i1.type = cn.tedu.flume.interceptor.AuthInterceptor$Builder

a1.channels.c1.type = memory

a1.sinks.k1.type = logger

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

f. 启动Flume:

../bin/flume-ng agent -n a1 -c ../conf -f authin.conf -

Dflume.root.logger=INFO,console

http://www.lryc.cn/news/104226.html

相关文章:

  • P2P网络NAT穿透原理(打洞方案)
  • Gof23设计模式之桥接外观模式
  • 微服务性能分析工具 Pyroscope 初体验
  • 工作记录------单元测试(持续更新)
  • C#再windowForm窗体中绘画扇形并给其填充颜色
  • MBA拓展有感-见好就收,还是挑战到底?MBA拓展有感-见好就收,还是挑战到底?
  • 综合布线系统光缆分类及其特点?
  • 前端构建(打包)工具发展史
  • 【数据可视化】(一)数据可视化概述
  • GoogleLeNet Inception V2 V3
  • 【css】背景图片附着
  • 解决运行flutter doctor --android-licenses时报错
  • 在使用Python爬虫时遇到503 Service Unavailable错误解决办法汇总
  • 小研究 - 主动式微服务细粒度弹性缩放算法研究(一)
  • 【LeetCode】215.数组中的第K个最大元素
  • MySQL学习记录:第七章 存储过程和函数
  • Docker中gitlab以及gitlab-runner的安装与使用
  • 一起学SF框架系列5.12-spring-beans-数据绑定dataBinding
  • 火热报名中 | 赛宁独家技术支持第七届“蓝帽杯”网络安全技能大赛
  • 无涯教程-jQuery - Ajax Tutorial函数
  • Android日志
  • 【Golang 接口自动化08】使用标准库httptest完成HTTP请求的Mock测试
  • SpringBoot自定义注解 + AOP+分布式Redis 防止重复提交
  • 3.yum安装分布式LNMP--剧本
  • 论文笔记:Fine-Grained Urban Flow Prediction
  • 系统集成|第八章(笔记)
  • 【分布式】分布式唯一 ID 的 几种生成方案以及优缺点snowflake优化方案
  • FFmpeg5.0源码阅读——av_interleaved_write_frame
  • 力扣 70. 爬楼梯
  • AVFoundation - 媒体捕捉