当前位置: 首页 > news >正文

【大数据之Flume】七、Flume进阶之自定义Sink

(1)概述:
  Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。

  Sink 是完全事务性的。在从 Channel 批量删除数据之前,每个 Sink 用 Channel 启动一个事务。批量事件一旦成功写出到存储系统或下一个 Flume Agent,Sink 就利用 Channel 提交事务。事务一旦被提交,该 Channel 从自己的内部缓冲区删除事件。

  Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、自定义。官方提供的 Sink 类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些 Sink。

  自定义 sink 的接口:https://flume.apache.org/releases/content/1.11.0/FlumeDeveloperGuide.html#sink

  MySink 需要继承 AbstractSink 类并实现 Configurable 接口。

  实现相应方法:
    configure(Context context)//初始化 context(读取配置文件内容)
    process()//从 Channel 读取获取数据(event),这个方法将被循环调用。

  适用于:读取 Channel 数据写入 MySQL 或者其他文件系统。

(2)需求:
  使用 flume 接收数据,并在 Sink 端给每条数据添加前缀和后缀,输出到控制台。前后缀可在 flume 任务配置文件中配置。

(3)分析:
在这里插入图片描述

步骤:
(1)创建一个 maven 项目,并引入以下pom依赖。

<dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version>
</dependency>

(2)自定义MySink ,继承 AbstractSink 类并实现 Configurable 接口,并打包,将jar包放到/opt/module/flume-1.9.0/lib目录下。

package com.study.sink;import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class MySink extends AbstractSink implements Configurable
{//声明前后缀private String perfix;private String subfix;//创建logger对象用于打印到控制台private Logger logger = LoggerFactory.getLogger(MySink.class);@Overridepublic void configure(Context context) {perfix = context.getString("per","per-");subfix = context.getString("sub","-sub");}@Overridepublic Status process() throws EventDeliveryException {//1.获取channel并开启事务Channel channel = getChannel();Transaction transaction = channel.getTransaction();transaction.begin();//2.从channel中抓取数据打印到控制台try {//2.1抓取数据//创建事件Event event;while(true){//防止空数据event  = channel.take();if (event != null)break;}//2.2处理事件logger.info(perfix+new String(event.getBody())+subfix);//2.3提交事务transaction.commit();return Status.READY;} catch (Exception e) {e.printStackTrace();//回滚transaction.rollback();return Status.BACKOFF;} finally {transaction.close();}}
}

(3)在/opt/module/flume-1.9.0/job下创建文件夹group6,在该文件夹下创建配置文件netcat-flume-mysink.conf。

# Name the components on this agent 
#组件声明
a1.sources = r1
a1.sinks = k1 
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = netcat 
a1.sources.r1.bind = localhost 
a1.sources.r1.port =44444# Describe the sink 
a1.sinks.k1.type = com.study.sink.MySink
a1.sinks.per = per
a1.sinks.sub = sub# Use a channel whichbuffers events in memory
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel 
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(4)开启任务

bin/flume-ng agent -c conf/ -n a1 job/group6/netcat-flume-mysink.conf -Dflume.root.logger=INFO,console

(5)开启端口并发送消息

nc localhost 44444

(6)结果
在这里插入图片描述

http://www.lryc.cn/news/118029.html

相关文章:

  • vue对于时间的处理
  • Apache DolphinScheduler 3.1.8 版本发布,修复 SeaTunnel 相关 Bug
  • 科技云报道:一波未平一波又起?AI大模型再出邪恶攻击工具
  • 深度对话|如何设计合适的网络经济激励措施
  • opencv带GStreamer之Windows编译
  • Java并发编程之锁的升级
  • 多核异构处理器A核与M核通信过程
  • 面试热题(反转链表)
  • 竞赛项目 深度学习的水果识别 opencv python
  • Java项目部署云windows细节
  • 软件功能测试有什么注意事项?功能测试报告起到什么作用?
  • Kubernetes 调度 约束
  • Grafana技术文档-概念-《十分钟扫盲》
  • 【JavaEE进阶】Spring 更简单的读取和存储对象
  • KafKa集群搭建和知识点
  • 剑指 Offer 56 - I. 数组中数字出现的次数题解
  • CSDN付费专栏写作协议
  • [保研/考研机试] KY30 进制转换-大整数转二进制 清华大学复试上机题 C++实现
  • vue3多条件搜索功能
  • C++20协程
  • Zabbix 6.0 监控其他
  • Django rest_framework Serializer中的create、Views中的create/perform_create的区别
  • 差异性分析傻瓜版
  • Keystone Automotive EDI 需求分析
  • jmeter创建一个压测项目
  • CEC2013(MATLAB):淘金优化算法GRO求解CEC2013的28个函数
  • AI Deep Reinforcement Learning Autonomous Driving(深度强化学习自动驾驶)
  • Java super
  • 【人工智能前沿弄潮】——生成式AI系列:Diffusers学习(1)了解Pipeline 、模型和scheduler
  • TypeScript 非空断言