当前位置: 首页 > news >正文

Flume拦截器的实现

Flume conf文件编写

vim file_to_kafka.conf
#定义组件
a1.sources = r1
a1.channels = c1#配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /Users/zhangjin/model/project/realtime-flink/applog/log/app.*
# 设置断点续传的位置
a1.sources.r1.positionFile = /Users/zhangjin/model/flume/taildir_position.json
a1.sources.r1.interceptors =  i1
a1.sources.r1.interceptors.i1.type = com.flume.interceptor.ETLInterceptor$Builder#配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = localhost:9092
a1.channels.c1.kafka.topic = topic_log
# 设置不以Flume event 写入数据,以Body数据进行写入
a1.channels.c1.parseAsFlumeEvent = false#组装
a1.sources.r1.channels = c1

Flume ETLInterceptor拦截器的编写

maven依赖

	<dependencies><!--Flume依赖 --><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version><scope>provided</scope></dependency><!--Json格式校验--><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version></dependency></dependencies>

maven package打包依赖

    <build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>2.3.2</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>

判断是否是JSON字符串

public class JSONUtil {/** 通过异常判断是否是json字符串* 是:返回true  不是:返回false* */public static boolean isJSONValidate(String log){try {JSONObject.parseObject(log);return true;}catch (JSONException e){return false;}}
}

拦截器实现

  1. 继承Interceptor接口
  2. 实现单event处理
  3. 实现批量event处理
public class ETLInterceptor implements Interceptor {@Overridepublic void initialize() {}/*** 单个event处理* 检验是否是Json格式* @param event* @return*/@Overridepublic Event intercept(Event event) {//1 获取json数据byte[] body = event.getBody();String log = new String(body, StandardCharsets.UTF_8);//2 校验json数据if (JSONUtil.isJSONValidate(log)) {return event;} else {return null;}}/*** 多个event处理* @param list* @return*/@Overridepublic List<Event> intercept(List<Event> list) {Iterator<Event> iterator = list.iterator();while (iterator.hasNext()) {Event event = iterator.next();if (intercept(event) == null) {iterator.remove();}}return list;}@Overridepublic void close() {}/*** 拦截器重写Builder方法*/public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new ETLInterceptor();}@Overridepublic void configure(Context context) {}}
}

测试

maven package打包,将生成的jar包放在了Flume的lib目录下
启动kafka

# 启动命令
./bin/kafka-server-start.sh -daemon ./config/server.properties &# 开启消费者
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_log

启动Flume

bin/flume-ng agent -n a1 -c conf/ -f job/file_to_kafka.conf -Dflume.root.logger=info,console
http://www.lryc.cn/news/514853.html

相关文章:

  • Swift Combine 学习(四):操作符 Operator
  • leetcode 173.二叉搜索树迭代器栈绝妙思路
  • df.groupby([pd.Grouper(freq=‘1M‘, key=‘Date‘), ‘Buyer‘]).sum()
  • LLM - 使用 LLaMA-Factory 部署大模型 HTTP 多模态服务 (4)
  • icp备案网站个人备案与企业备案的区别
  • 如何不修改模型参数来强化大语言模型 (LLM) 能力?
  • AF3 AtomAttentionEncoder类的init_pair_repr方法解读
  • DDoS攻击防御方案大全
  • Vue中常用指令
  • Servlet解析
  • 带虚继承的类对象模型
  • 深度学习中的离群值
  • 如何利用Logo设计免费生成器创建专业级Logo
  • Mysql SQL 超实用的7个日期算术运算实例(10k)
  • 运算指令(PLC)
  • 「Mac畅玩鸿蒙与硬件49」UI互动应用篇26 - 数字填色游戏
  • 机器学习经典算法——逻辑回归
  • 【数据仓库金典面试题】—— 包含详细解答
  • 【UE5 C++课程系列笔记】19——通过GConfig读写.ini文件
  • JS 中 json数据 与 base64、ArrayBuffer之间转换
  • USB 驱动开发 --- Gadget 驱动框架梳理
  • 细说STM32F407单片机中断方式CAN通信
  • Python应用指南:高德交通态势数据
  • 医学图像分析工具01:FreeSurfer || Recon -all 全流程MRI皮质表面重建
  • .NET框架用C#实现PDF转HTML
  • mamba-ssm安装
  • 网络IP协议
  • 双指针算法详解
  • MySQL的最左匹配原则是什么
  • LeetCode:106.从中序与后序遍历序列构造二叉树