当前位置: 首页 > news >正文

Flume学习笔记(3)—— Flume 自定义组件

前置知识:

Flume学习笔记(1)—— Flume入门-CSDN博客

Flume学习笔记(2)—— Flume进阶-CSDN博客

Flume 自定义组件

自定义 Interceptor

需求分析:使用 Flume 采集服务器本地日志,需要按照日志类型的不同,将不同种类的日志发往不同的分析系统

需要使用Flume 拓扑结构中的 Multiplexing 结构,Multiplexing的原理是,根据 event 中 Header 的某个 key 的值,将不同的 event 发送到不同的 Channel中,所以我们需要自定义一个 Interceptor,为不同类型的 event 的 Header 中的 key 赋予不同的值

实现流程:

代码

导入依赖:

<dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version></dependency>
</dependencies>

自定义拦截器的代码:

package com.why.interceptor;import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.util.ArrayList;
import java.util.List;
import java.util.Map;public class TypeInterceptor implements Interceptor {//存放事件的集合private List<Event> addHeaderEvents;@Overridepublic void initialize() {//初始化集合addHeaderEvents = new ArrayList<>();}//单个事件拦截@Overridepublic Event intercept(Event event) {//获取头信息Map<String, String> headers = event.getHeaders();//获取body信息String body = new String(event.getBody());//根据数据中是否包含”why“来分组if(body.contains("why")){headers.put("type","first");}else {headers.put("type","second");}return event;}//批量事件拦截@Overridepublic List<Event> intercept(List<Event> events) {//清空集合addHeaderEvents.clear();//遍历eventsfor(Event event : events){//给每一个事件添加头信息addHeaderEvents.add(intercept(event));}return addHeaderEvents;}@Overridepublic void close() {}//构建生成器public static class TypeBuilder implements Interceptor.Builder{@Overridepublic Interceptor build() {return new TypeInterceptor();}@Overridepublic void configure(Context context) {}}}

将代码打包放入flume安装路径下的lib文件夹中

配置文件

在job文件夹下创建group4目录,添加配置文件;

为 hadoop102 上的 Flume1 配置 1 个 netcat source,1 个 sink group(2 个 avro sink),并配置相应的 ChannelSelector 和 interceptor

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.why.interceptor.TypeInterceptor$TypeBuilder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.first = c1
a1.sources.r1.selector.mapping.second = c2# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k1.port = 4141
a1.sinks.k2.type=avro
a1.sinks.k2.hostname = hadoop104
a1.sinks.k2.port = 4242# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Use a channel which buffers events in memory
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

hadoop103:配置一个 avro source 和一个 logger sink

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop103
a1.sources.r1.port = 4141
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1

hadoop104:配置一个 avro source 和一个 logger sink

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop104
a1.sources.r1.port = 4242
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1

执行指令

hadoop102:bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group4/flume-interceptor-flume.conf

hadoop103:bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group4/flume1-flume-logger.conf -Dflume.root.logger=INFO,console

hadoop104:bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group4/flume2-flume-logger.conf -Dflume.root.logger=INFO,console

然后hadoop102通过nc连接44444端口,发送数据:

在hadoop103和104上分别接收到:

自定义 Source

官方提供的文档:Flume 1.11.0 Developer Guide — Apache Flume

给出的示例代码如下:

public class MySource extends AbstractSource implements Configurable, PollableSource {private String myProp;@Overridepublic void configure(Context context) {String myProp = context.getString("myProp", "defaultValue");// Process the myProp value (e.g. validation, convert to another type, ...)// Store myProp for later retrieval by process() methodthis.myProp = myProp;}@Overridepublic void start() {// Initialize the connection to the external client}@Overridepublic void stop () {// Disconnect from external client and do any additional cleanup// (e.g. releasing resources or nulling-out field values) ..}@Overridepublic Status process() throws EventDeliveryException {Status status = null;try {// This try clause includes whatever Channel/Event operations you want to do// Receive new dataEvent e = getSomeData();// Store the Event into this Source's associated Channel(s)getChannelProcessor().processEvent(e);status = Status.READY;} catch (Throwable t) {// Log exception, handle individual exceptions as neededstatus = Status.BACKOFF;// re-throw all Errorsif (t instanceof Error) {throw (Error)t;}} finally {txn.close();}return status;}
}

需要继承AbstractSource,实现Configurable, PollableSource

实战需求分析

使用 flume 接收数据,并给每条数据添加前缀,输出到控制台。前缀可从 flume 配置文件中配置

代码

package com.why.source;import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;import java.util.HashMap;
import java.util.concurrent.ConcurrentMap;public class MySource extends AbstractSource implements PollableSource, Configurable {//定义配置文件将来要读取的字段private Long delay;private String field;//获取数据封装成 event 并写入 channel,这个方法将被循环调用@Overridepublic Status process() throws EventDeliveryException {try {//事件头信息HashMap<String,String> headerMap = new HashMap<>();//创建事件SimpleEvent event = new SimpleEvent();//循环封装事件for (int i = 0; i < 5; i++) {//设置头信息event.setHeaders(headerMap);//设置事件内容event.setBody((field + i).getBytes());//将事件写入ChannelgetChannelProcessor().processEvent(event);Thread.sleep(delay);}}catch (InterruptedException e) {throw new RuntimeException(e);}return Status.READY;}//backoff 步长@Overridepublic long getBackOffSleepIncrement() {return 0;}//backoff 最长时间@Overridepublic long getMaxBackOffSleepInterval() {return 0;}//初始化 context(读取配置文件内容)@Overridepublic void configure(Context context) {delay = context.getLong("delay");field = context.getString("field","Hello");}}

打包放到flume安装路径下的lib文件夹中;

配置文件

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = com.why.source.MySource
a1.sources.r1.delay = 1000
a1.sources.r1.field = why# Describe the sink
a1.sinks.k1.type = logger# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

执行指令

hadoop102上:bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group5/mysource.conf -Dflume.root.logger=INFO,console

结果如下:

自定义 Sink

Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。

Sink 是完全事务性的。在从 Channel 批量删除数据之前,每个 Sink 用 Channel 启动一个事务。批量事件一旦成功写出到存储系统或下一个 Flume Agent,Sink 就利用 Channel 提交事务。事务一旦被提交,该 Channel 从自己的内部缓冲区删除事件

官方文档:Flume 1.11.0 Developer Guide — Apache Flume

接口实例:

public class MySink extends AbstractSink implements Configurable {private String myProp;@Overridepublic void configure(Context context) {String myProp = context.getString("myProp", "defaultValue");// Process the myProp value (e.g. validation)// Store myProp for later retrieval by process() methodthis.myProp = myProp;}@Overridepublic void start() {// Initialize the connection to the external repository (e.g. HDFS) that// this Sink will forward Events to ..}@Overridepublic void stop () {// Disconnect from the external respository and do any// additional cleanup (e.g. releasing resources or nulling-out// field values) ..}@Overridepublic Status process() throws EventDeliveryException {Status status = null;// Start transactionChannel ch = getChannel();Transaction txn = ch.getTransaction();txn.begin();try {// This try clause includes whatever Channel operations you want to doEvent event = ch.take();// Send the Event to the external repository.// storeSomeData(e);txn.commit();status = Status.READY;} catch (Throwable t) {txn.rollback();// Log exception, handle individual exceptions as neededstatus = Status.BACKOFF;// re-throw all Errorsif (t instanceof Error) {throw (Error)t;}}return status;}
}

自定义MySink 需要继承 AbstractSink 类并实现 Configurable 接口

实战需求分析

使用 flume 接收数据,并在 Sink 端给每条数据添加前缀和后缀,输出到控制台。前后缀可在 flume 任务配置文件中配置

代码

package com.why.sink;import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class MySink extends AbstractSink implements Configurable {//创建 Logger 对象private static final Logger LOG = LoggerFactory.getLogger(AbstractSink.class);//前后缀private String prefix;private String suffix;@Overridepublic Status process() throws EventDeliveryException {//声明返回值状态信息Status status;//获取当前 Sink 绑定的 ChannelChannel ch = getChannel();//获取事务Transaction txn = ch.getTransaction();//声明事件Event event;//开启事务txn.begin();//读取 Channel 中的事件,直到读取到事件结束循环while (true) {event = ch.take();if (event != null) {break;}}try {//处理事件(打印)LOG.info(prefix + new String(event.getBody()) + suffix);//事务提交txn.commit();status = Status.READY;} catch (Exception e) {//遇到异常,事务回滚txn.rollback();status = Status.BACKOFF;} finally {//关闭事务txn.close();}return status;}@Overridepublic void configure(Context context) {prefix = context.getString("prefix", "hello");suffix = context.getString("suffix");}
}

打包放到flume安装路径下的lib文件夹中;

配置文件

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444# Describe the sink
a1.sinks.k1.type = com.why.sink.MySink
a1.sinks.k1.prefix = why:
a1.sinks.k1.suffix = :why# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

执行指令

hadoop102上:bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group6/mysink.conf -Dflume.root.logger=INFO,console

结果如下:

http://www.lryc.cn/news/235956.html

相关文章:

  • go的字符切片和字符串互转
  • 所见即所得的动画效果:Animate.css
  • ERR:Navicat连接Sql Server报错
  • python算法例10 整数转换为罗马数字
  • springboot引入第三方jar包放到项目目录中,添加web.xml
  • 大数据研发工程师课前环境搭建
  • Qt图形视图框架:QGraphicsItem详解
  • defer和async
  • 数电实验-----实现74LS139芯片扩展为3-8译码器以及应用(Quartus II )
  • 洋葱架构、三层架构及两者区别
  • JavaEE进阶学习:Spring 的创建和使用
  • 音视频项目—基于FFmpeg和SDL的音视频播放器解析(十四)
  • Tomcat无法映射到activiti-app导致activiti无法启动页面
  • c语言常见的面试问题
  • image图片之间的间隙消除
  • asp.net心理健康管理系统VS开发sqlserver数据库web结构c#编程计算机网页项目
  • CnosDB有主复制演进历程
  • 【前沿学习】美国零信任架构发展现状与趋势研究
  • Toolformer论文阅读笔记(简略版)
  • Pytorch torch.dot、torch.mv、torch.mm、torch.norm的用法详解
  • Jave 定时任务:使用Timer类执行定时任务为何会发生任务阻塞?如何解决?
  • Visual Studio Code配置c/c++环境
  • 漏洞利用工具的编写
  • ChatGPT之父被OpenAI解雇
  • linux中利用fork复制进程,printf隐藏的缓冲区,写时拷贝技术,进程的逻辑地址与物理地址
  • java游戏制作-拼图游戏
  • 使用sklearn报AttributeError: ‘NoneType‘ object has no attribute ‘split‘
  • C++学习 --map
  • 基于Qt QList和QMap容器类示例
  • Flask学习一:概述