当前位置: 首页 > news >正文

Kafka集成flume

1.flume作为生产者集成Kafka

        kafka作为flume的sink,扮演消费者角色

     1.1 flume配置文件

        vim $kafka/jobs/flume-kafka.conf

# agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1 c2# Describe/configure the source
a1.sources.r1.type = TAILDIR
#记录最后监控文件的断点的文件,此文件位置可不改
a1.sources.r1.positionFile =  /export/server/flume/job/data/tail_dir.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /export/server/flume/job/data/.*file.*
a1.sources.r1.filegroups.f2 =/export/server/flume/job/data/.*log.*# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = customers
a1.sinks.k1.kafka.bootstrap.servers =node1:9092,node2:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

1.2开启flume监控

flume-ng agent -n a1 -c conf/ -f /export/server/kafka/jobs/kafka-flume.conf

1.3开启Kafka消费者

kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092 --topic consumers --from-beginning

1.4生产数据

往被监控文件输入数据

[ljr@node1 data]$echo hello >>file2.txt 
[ljr@node1 data]$ echo ============== >>file2.txt 

查看Kafka消费者

可见Kafka集成flume生产者成功。

2.flume作为消费者集成Kafka

        kafka作为flume的source,扮演生产者角色

2.1flume配置文件

vim $kafka/jobs/flume-kafka.conf


# agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
#注意不要大于channel transactionCapacity的值100
a1.sources.r1.batchSize = 50 
a1.sources.r1.batchDurationMillis = 200
a1.sources.r1.kafka.bootstrap.servers =node1:9092, node1:9092
a1.sources.r1.kafka.topics = consumers
a1.sources.r1.kafka.consumer.group.id = custom.g.id# Describe the sink
a1.sinks.k1.type = logger# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
#注意transactionCapacity的值不要小于sources batchSize的值50
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2.2开启flume监控

flume-ng agent -n a1 -c conf/ -f /export/server/kafka/jobs/kafka-flume1.conf

2.3开启Kafka生产者并生产数据

kafka-console-producer.sh --bootstrap-server node1:9092,node2:9092 --topic consumers

 

查看flume监控台

可见Kafka集成flume消费者成功。

http://www.lryc.cn/news/374767.html

相关文章:

  • 如何让视频有高级感 高级感视频制作方法 高级感视频怎么剪 会声会影视频剪辑制作教程 会声会影中文免费下载
  • 详解|访问学者申请被拒原因有哪些?
  • [鹤城杯 2021]BabyRSA
  • 西安市工业倍增引导基金子基金申报条件流程和材料程序指南(2024年)
  • 微型丝杆的耐用性和延长使用寿命的关键因素!
  • 音频文件下载后,如何轻松转换格式?
  • Intel平台,13600KF+3060Ti,虚拟机安装macOS 14(2024年6月)
  • Cookie、Session、Token的关系和区别
  • Windows 11 中安装 Docker Desktop 并安装镜像
  • 深入剖析Java线程池之“newWorkStealingPool“
  • 《跟我一起学“网络安全”》——安全设备
  • 猜测Tomcat如何实现WebSocket协议
  • uniApp @input事件更改输入框值,值改变了但是页面没更新新的值
  • 两行css 实现瀑布流
  • Centos7.9部署单节点K8S环境
  • 【CV】stable diffusion初步理解
  • 足底筋膜炎最好的恢复办法
  • Fiddler抓包工具介绍
  • 知乎号开始运营了,宣传一波
  • Go 基础丨切片 slice
  • 哪个牌子充电宝好用?精选四大热门款充电宝品牌!公认好用
  • WPF/C#:如何将数据分组显示
  • leetcode 200 岛屿数量
  • ​1:25万基础电子地图(江西版)
  • 【RabbitMQ】初识 RabbitMQ
  • Qt QListView自定义树状导航控件
  • Java 数组的全面解析与应用
  • Thinkphp起名网宝宝起名网站源码
  • 【解决方案】【最佳实践】React高阶组件中Refs 不会被传递的问题
  • SRAM和DRAM