当前位置: 首页 > news >正文

flume拦截器

flume拦截器代码

1.依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>flume-interceptor</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version><scope>provided</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version></dependency></dependencies><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>2.3.2</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>

2.核心代码

package com.atguigu.gmall.flume.log.interceptor;import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;public class TimestampAndTableNameInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {Map<String, String> headers = event.getHeaders();
String log = new String(event.getBody(), StandardCharsets.UTF_8);JSONObject jsonObject = JSONObject.parseObject(log);Long ts = jsonObject.getLong("ts");//Maxwell输出的数据中的ts字段时间戳单位为秒,Flume HDFSSink要求单位为毫秒String timeMills = String.valueOf(ts * 1000);String tableName = jsonObject.getString("table");String databaseName = jsonObject.getString("database");headers.put("timestamp", timeMills);headers.put("tableName", tableName);headers.put("databaseName", databaseName);return event;}@Overridepublic List<Event> intercept(List<Event> events) {for (Event event : events) {intercept(event);}return events;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new TimestampAndTableNameInterceptor ();}@Overridepublic void configure(Context context) {}}
}
http://www.lryc.cn/news/145549.html

相关文章:

  • vue、elementui控制前一级选择后,后一级才会有数据
  • 亲测influxdb安装为window后台服务
  • 【LeetCode - 每日一题】823. 带因子的二叉树 (2023.08.29)
  • flutter 上传图片并裁剪
  • 一台服务器上部署 Redis 伪集群
  • ealtek高清晰音频管理器(realtek高清晰音频管理器怎么设置win10)
  • 微信小程序 scroll-view 组件的 bindscroll 不触发不生效
  • datax 删除分区数据,再写入MySQL脚本
  • hyperf 十四 国际化
  • C语言_初识C语言指针
  • EMQX启用双向SSL/TLS安全连接以及java连接
  • 4399面试总结C/C++游戏开发
  • hashlib 模块学习
  • 大模型开发05:PDF 翻译工具开发实战
  • LeetCode 43题:字符串相乘
  • 基于java Swing 和 mysql实现的飞机订票系统(源码+数据库+ppt+ER图+流程图+架构说明+论文+运行视频指导)
  • Jmeter性能综合实战 —— 签到及批量签到
  • 燃气管网监测系统,提升城市燃气安全防控能力
  • 【SQL】1731. 每位经理的下属员工数量 ( 新思想:确定左表,依次添加后续字段)
  • AMD Radeon RX 7000/6000系列显卡安装ROCm 调用CUDA
  • 钉钉小程序引用阿里巴巴图标
  • 深入了解Nginx:高性能的开源Web服务器与反向代理
  • vue3 自定义显示内容
  • 视频行为分析——视频图像转换与ffmpeg相关操作
  • Bean 生命周期
  • JavaScript原型链污染
  • 【Java】设计模式之单例模式与工厂模式
  • web自动化框架:selenium学习使用操作大全(Python版)
  • boringssl EVP_aes_128_ecb实现
  • vxe-table中树形结构