当前位置: 首页 > news >正文

日志采集传输框架之 Flume,将监听端口数据发送至Kafka

1、简介        

        Flume 是 Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传
输的系统。Flume 基于流式架构,主要有以下几个部分组成。

 主要组件介绍:

1)、Flume Agent 是一个 JVM 进程,它以事件的形式将数据从源头送至目的。Agent 主要有 3 个部分组成, Source 、 Channel 、 Sink 。

2)、Source 是负责接收数据到 Flume Agent 的组件。 Source 组件可以处理各种类型、各种
格式的日志数据,包括 avro 、 thrift 、 exec 、 jms 、netcat 、 taildir 、syslog 、 http 。

3)、Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent 。Sink 组件目的地包括 hdfs、logger、avro 、 thrift、file 、 HBase 、 solr 、自定义。

4)、Channel 是位于 Source 和 Sink 之间的缓冲区。因此, Channel 允许 Source 和 Sink 运作在不同的速率上。 Channel 是线程安全的,可以同时处理几个 Source 的写入操作和几个Sink 的读取操作。Flume 自带两种 Channel Memory Channel File Channel
        Memory Channel:是内存中的队列。 Memory Channel 在不需要关心数据丢失的情景下适用。如果需要关心数据丢失,那么 Memory Channel 就不应该使用,因为程序死亡、机器宕机或者重启都会导致数据丢失。
        File Channel:将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。

5)、Event 传输单元,Flume 数据传输的基本单元,以 Event 的形式将数据从源头送至目的地。Event 由 Header 和 Body 两部分组成, Header 用来存放该 event 的一些属性,为 K-V 结构、Body 用来存放该条数据,形式为字节数组。

2、下载安装

下载地址:Download — Apache Flume

本文下载安装包为:apache-flume-1.11.0-bin.tar.gz

解压安装

# 解压到指定目录
tar -zxvf apache-flume-1.11.0-bin.tar.gz -C /opt/software
 3、案例        

        使用监听端口数据,发送到kafka中。Kafka 集群搭建启动参考博客:kafka丢弃zookeeper,使用kraft架构-CSDN博客

3.1、创建Flume agent 配置文件 vim flume-netcat-kafka.conf:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 192.168.30.90
a1.sources.r1.port = 44444# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = topic1
a1.sinks.k1.kafka.bootstrap.servers = 192.168.30.88:9092,192.168.30.89:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
3.2、安装 netcat 工具
# 安装 netcat 工具
yum install -y nc
3.3、启动flume
# 1、第一种方式
bin/flume-ng agent --conf conf/ --name a1 --conf-file test/flume-netcat-kafka.conf -
Dflume.root.logger=INFO,console
# 2、第二种方式
bin/flume-ng agent -c conf/ -n a1 -f test/flume-netcat-kafka.conf -Dflume.root.logger=INFO,console

参数说明:

1)、--conf/-c:表示配置文件存储在conf/目录;
2)、--name/-n:表示给agent 起名为a1;
3)、--conf-file/-f:flume 本次启动读取的配置文件是在 test 文件夹下的 flume-netcat-kafka.conf 文件。
4)、-Dflume.root.logger=INFO,console :-D 表示flume 运行时动态修改 flume.root.logger 参数属性值,并将控制台日志打印级别设置为INFO 级别。日志级别包括:log、info、warn、error。

3.4、测试
3.4.1、启动监听44444端口
nc 192.168.30.90 44444
3.4.2、启动 Kafka 消费者控制台
bin/kafka-console-consumer.sh --bootstrap-server 192.168.30.88:9092 --topic topic1 --from-beginning
3.4.3、结果展示

4、总结 

        本文详细介绍 flume 组件各个模块的含义,并且实现从监听端口数据发送至Kafka案例,帮助大家快速掌握flume的使用。关于flume 如何自定义 Source 和 Sink,将在后续博客中更新。

        本人是一个从小白自学计算机技术,对运维、后端、各种中间件技术、大数据等有一定的学习心得,想获取自学总结资料(pdf版本)或者希望共同学习,关注微信公众号:it自学社团。后台回复相应技术名称/技术点即可获得。(本人学习宗旨:学会了就要免费分享)

http://www.lryc.cn/news/281432.html

相关文章:

  • 关于Vue前端接口对接的思考
  • 【设计模式之美】SOLID 原则之三:里式替换(LSP)跟多态有何区别?如何理解LSP中子类遵守父类的约定
  • 代码随想录第六十三天——被围绕的区域,太平洋大西洋水流问题,最大人工岛
  • Docker 项目如何使用 Dockerfile 构建镜像?
  • 实践学习PaddleScience飞桨科学工具包
  • Vue 中修改 Element 组件的 下拉菜单(Dropdown) 的样式
  • 达梦数据库主备集群
  • Spark Doris Connector 可以支持通过 Spark 读取 Doris 数据类型不兼容报错解决
  • 深入理解 go chan
  • java+vue基于Spring Boot的渔船出海及海货统计系统
  • Linux第25步_在虚拟机中备份“ST官方的TF-A源码”
  • 统计学-R语言-4.1
  • C++(1) —— 基础语法入门
  • 构建基于RHEL8系列(CentOS8,AlmaLinux8,RockyLinux8等)的支持63个常见模块的PHP8.1.20的RPM包
  • Vue-插槽(Slots)
  • 新火种AI|GPT-5前瞻!GPT-5将具备哪些新能力?
  • 安防视频监控系统EasyCVR设备分组中在线/离线数量统计的开发与实现
  • spring cloud之集成sentinel
  • 让车辆做到“耳听八方”,毫米波雷达芯片与系统设计
  • Python如何实现数据驱动的接口自动化测试
  • 高级分布式系统-第15讲 分布式机器学习--联邦学习
  • 小程序基础学习(事件处理)
  • 网络协议与攻击模拟_01winshark工具简介
  • 【linux学习笔记】网络
  • JUC-线程中断机制和LockSupport
  • 哈希表与哈希算法(Python系列30)
  • 『 C++ 』AVL树详解 ( 万字 )
  • Python下载安装pip方法与步骤_pip国内镜像
  • 自动化测试框架pytest系列之基础概念介绍(一)
  • 编码技巧:如何在Golang中高效解析和生成XML