当前位置: 首页 > news >正文

(二十八)大数据实战——Flume数据采集之kafka数据生产与消费集成案例

前言

本节内容我们主要介绍一下flume数据采集和kafka消息中间键的整合。通过flume监听nc端口的数据,将数据发送到kafka消息的first主题中,然后在通过flume消费kafka中的主题消息,将消费到的消息打印到控制台上。集成使用flume作为kafka的生产者和消费者。关于nc工具、flume以及kafka的安装部署,这里不在赘述,请读者查看作者往期博客内容。整体架构如下:

正文

  • 启动Kafka集群,创建first主题

- 启动Kafka集群

- 创建first主题

kafka-topics.sh --bootstrap-server hadoop101:9092 --create --topic first --partitions 3 --replication-factor 3

- 查看first主题详情

kafka-topics.sh --bootstrap-server hadoop101:9092 --describe --topic first

  • 在hadoop101服务器flume安装目录/opt/module/apache-flume-1.9.0/job下创建nc监听服务

 - 创建nc监听的flume任务:job-netcat-flume-kafka.conf

# 1 组件定义
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 2 配置 source
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop101
a1.sources.r1.port = 1111
# 3 配置 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 4 配置 sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
# 5 拼接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

  • 在hadoop102服务器flume安装目录/opt/module/apache-flume-1.9.0/job下创建kafka监听r任务

-  创建kafka监听的flume任务:job-kafka-flume-console.conf

# 1 组件定义
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 2 配置 source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 50
a1.sources.r1.batchDurationMillis = 200
a1.sources.r1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
a1.sources.r1.kafka.topics = first
a1.sources.r1.kafka.consumer.group.id = custom.g.id
# 3 配置 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 4 配置 sink
a1.sinks.k1.type = logger
# 5 拼接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

  • 在hadoop102服务器启动kafka监听任务job-kafka-flume-console.conf

- 启动job-kafka-flume-console.conf任务

bin/flume-ng agent -c conf/ -n a1 -f job/job-kafka-flume-console.conf -Dflume.root.logger=INFO,console

  •  在hadoop101服务器启动nc监听任务job-netcat-flume-kafka.conf

 - 启动job-netcat-flume-kafka.conf任务

bin/flume-ng agent -c conf/ -n a1 -f job/job-netcat-flume-kafka.conf -Dflume.root.logger=INFO,console

  •  使用netcat工具发送数据到nc服务1111端口

- 发送nc消息

  • 查看结果 

- 控制台结果

结语

该案例证明了flume1成功采集到了nc监听端口的数据,并将数据发送到了kafka主题first中,flume2成功从kafka主题中消费到了数据并打印到了控制台。关于Flume数据采集之kafka数据生产与消费的集成案例到这里就结束了,我们下期见。。。。。。

http://www.lryc.cn/news/165875.html

相关文章:

  • vue3:22、vue-router的使用
  • 深入理解JVM虚拟机第五篇:一些常用的JVM虚拟机(二)
  • 导数公式及求导法则
  • SpringMVC系列(六)之JSON数据返回以及异常处理机制
  • 民安智库(北京第三方窗口测评)开展汽车消费者焦点小组座谈会调查
  • 【CVPR2021】MVDNet论文阅读分析与总结
  • IDEA指定Maven settings file文件未生效
  • swift UI 和UIKIT 如何配合使用
  • c语言练习题55:IP 地址⽆效化
  • nvidia-persistenced 常驻
  • leetcode 42, 58, 14(*)
  • SpringCloud-微服务CAP原则
  • K8S:Yaml文件详解
  • 机器人连续位姿同步插值轨迹规划—对数四元数、b样条曲线、c2连续位姿同步规划
  • 三维模型3DTile格式轻量化压缩的遇到常见问题与处理方法分析
  • 2023-简单点-开启防火墙后,ping显示请求超时;windows共享盘挂在不上
  • 华为Java工程师面试题
  • 大数据Flink(七十四):SQL的滑动窗口(HOP)
  • Hystrix和Sentinel熔断降级设计理念
  • 获取Windows 10中的照片(旧版)下载
  • 【Redis】Redis作为缓存
  • IDEA(2023)解决运行乱码问题
  • 零基础学前端(二)用简单案例去理解 HTML 、CSS 、JavaScript 概念
  • 线性矩阵不等式(LMI)在控制理论中的应用
  • 如何在Python爬虫程序中使用HTTP代理?
  • ARM架构指令集--专用指令
  • 免费IP类api接口:含ip查询、ip应用场景查询、ip代理识别、IP行业查询...
  • Android Studio 中AGP ,Gradle ,JDK,SDK都是什么?
  • 算法通关18关 | 回溯模板如何解决复原IP问题
  • Layui快速入门之第五节 导航