当前位置: 首页 > news >正文

【大数据之Kafka】十六、Kafka集成外部系统之集成Flume

  Flume 是一个在大数据开发中非常常用的组件。可以用于 Kafka 的生产者,也可以用于 Kafka 的消费者。
在这里插入图片描述
Flume安装和部署:https://blog.csdn.net/qq_18625571/article/details/131678589?spm=1001.2014.3001.5501

1 Flume生产者

在这里插入图片描述
(1)在hadoop102启动Kafka集群。

zk.sh start
kf.sh start

(2)在hadoop103启动Kafka消费者。

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092,hadoop103:9092 --topic first

(3)在hadoop102上安装Flume:https://blog.csdn.net/qq_18625571/article/details/131678589?spm=1001.2014.3001.5501 第一章Flume安装部署。

(4)在/opt/module/flume-1.9.0/job目录下创建配置文件file_to_kafka.conf。

# 1 组件定义
a1.sources = r1
a1.sinks = k1
a1.channels = c1# 2 配置 source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/app.*
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json# 3 配置 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# 4 配置 sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1# 5 拼接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(5)启动Flume

bin/flume-ng agent -c conf/ -n a1 -f job/file_to_kafka.conf

(6)向/opt/module/applog/app.log 里追加数据,查看 kafka 消费者消费情况。

mkdir applog
echo hello >> /opt/module/applog/app.log

(7)观察 kafka 消费者,能够看到消费的 hello 数据。

2 Flume消费者

在这里插入图片描述

(1)在 hadoop102 节点的 Flume 的/opt/module/flume/job 目录下创建 kafka_to_file.conf。

# 1 组件定义
a1.sources = r1
a1.sinks = k1
a1.channels = c1# 2 配置 source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 50
a1.sources.r1.batchDurationMillis = 200
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092
a1.sources.r1.kafka.topics = first
a1.sources.r1.kafka.consumer.group.id = custom.g.id# 3 配置 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# 4 配置 sink
a1.sinks.k1.type = logger# 5 拼接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(2)启动 Flume。

bin/flume-ng agent -c conf/ -n a1 -f job/kafka_to_file.conf -Dflume.root.logger=INFO,console

(3)启动 kafka 生产者,并输入数据,例如:hello world

bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first

(4)观察控制台输出的日志

http://www.lryc.cn/news/172220.html

相关文章:

  • java学习--day3 (运算符、if循环、switch-case结构)
  • ActiveMQ、RabbitMQ、RocketMQ、Kafka区别
  • csp初赛总结 那些年编程走过的坑 初高中信竞常考语法算法点
  • DollarTree(美元树)验厂需要注意哪些方面?
  • vector使用和模拟实现
  • token登录的实现
  • GO语言从入门到实战-Go语言课程介绍
  • 七天学会C语言-第六天(指针)
  • 2023年腾讯云轻量服务器测评:16核 32G 28M 配置CPU测试
  • macos (M2芯片)搭建flutter环境
  • Xilinx FPGA未使用管脚上下拉状态配置(ISE和Vivado环境)
  • 数据结构---链表(java)
  • Qt --- Day02
  • Redis 集合(Set)快速指南 | Navicat
  • 【华为云云耀云服务器L实例评测】- 云原生实践,快捷部署人才招聘平台容器化技术方案!
  • 【Java】泛型 之 什么是泛型
  • Python yaml 详解
  • RabbitMQ消息可靠性(二)-- 消费者消息确认
  • 【python第7课 实例,类】
  • RocketMQ源码解析(上)
  • Webpack打包CSS文件,解决You may need an appropriate loader to handle this file type报错
  • 轮换对称性
  • 【MySQL基础】--- 约束
  • ROS2 的行为树 — 第 1 部分:解锁高级机器人决策和控制
  • kafka事务的详解
  • Flutter Fair逻辑动态化架构设计与实现
  • 【每日一题】74. 搜索二维矩阵
  • 软件测试进大厂,拿高薪,怎么做?看这里!
  • 【读书笔记】基于世界500强的高薪实战Kubernetes课程
  • 【Java 基础篇】Java并发包详解