当前位置: 首页 > news >正文

Flume采集Kafka并把数据sink到OSS

安装环境

  1. Java环境, 略 (Flume依赖Java)
  2. Flume下载, 略
  3. Scala环境, 略 (Kafka依赖Scala)
  4. Kafak下载, 略
  5. Hadoop下载, 略 (不需要启动, 写OSS依赖)

配置Hadoop

下载JindoSDK(连接OSS依赖), 下载地址Github
解压后配置环境变量

export JINDOSDK_HOME=/usr/lib/jindosdk-x.x.x
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:${JINDOSDK_HOME}/lib/*

修改Hadoop配置, core-site.xml

<property><name>fs.oss.credentials.provider</name><value>com.aliyun.jindodata.oss.auth.SimpleCredentialsProvider</value></property><property><name>fs.oss.accessKeyId</name><value>xxxx</value></property><property><name>fs.oss.accessKeySecret</name><value>xxxx</value></property><property><name>fs.oss.endpoint</name><value>xxxxx</value></property><property><name>fs.AbstractFileSystem.oss.impl</name><value>com.aliyun.jindodata.oss.JindoOSS</value></property><property><name>fs.oss.impl</name><value>com.aliyun.jindodata.oss.JindoOssFileSystem</value></property>

配置可参考非EMR集群接入OSS-HDFS服务快速入门

配置Flume

此部分全文最关键, 请仔细看

  1. 基础配置部分, Flume配置
a1.sources = source1
a1.sinks = k1
a1.channels = c1a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.channels = c1
a1.sources.source1.kafka.bootstrap.servers = xxx
a1.sources.source1.kafka.topics = test
a1.sources.source1.kafka.consumer.group.id = flume-sink-group # 消费者组, 云组件需要先在管理后台创建
a1.sources.source1.kafka.consumer.auto.offset.reset = earliest # 从头消费Kafka里数据a1.sinks.k1.channel = c1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = oss://xxx/test/%Y%m%d # 自动按天分文件夹
a1.sinks.k1.hdfs.fileType=DataStreama1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000

可参考使用Flume同步EMR Kafka集群的数据至OSS-HDFS服务
2. 进阶配置, 根据自己情况按需配置

a1.sinks.k1.hdfs.rollInterval = 600 # 5分钟切换一个新文件
a1.sinks.k1.hdfs.rollSize = 134217728 # 或者文件大小达到128M则切换新文件
a1.sinks.k1.hdfs.rollCount = 0 # 写入多少条数据切换新文件, 0为不限制

我这里是为了防止sink的文件过于零碎, 但因为使用的memory channel, 缓存时间过长容易丢数据
3. Flume JVM参数
默认启动时-Xmx20m, 过于小了, 加大堆内存可以直接放开flume-env.shJAVA_OPTS的注释

export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"
  1. Channel问题
    如果对数据一致性要求较高, 可以把memory channel改用file channel, 请自行研究

XX启动!

几条测试命令

bin/zookeeper-server-start.sh config/zookeeper.properties # 启动zookeeper
bin/kafka-server-start.sh config/server.properties # 启动kafak服务bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name a1 # 启动flumebin/kafka-console-producer.sh --topic flume-test --bootstrap-server localhost:9092 # 启动一个生产者写测试数据
http://www.lryc.cn/news/249601.html

相关文章:

  • flutter,uni-app开发调试ios
  • MybatisBatchUtils功能介绍
  • Flutter使用flutter_gen管理资源文件
  • vue3 setup语法糖,常用的几个:defineProps、defineEmits、defineExpose、
  • JC/T 2087-2011建筑装饰用仿自然面艺术石检测
  • C语言——写一个简单函数,找两个数中最大者
  • 机器学习中的混淆矩阵
  • QT基础实践之简易计算器
  • 南大通用 GBase 8s数据库级别权限
  • 对话式数据需求激增,景联文科技提供高质量多轮对话数据定制采集标注服务
  • python第1天之常识及环境安装
  • 中国高纯石英砂行业市场研究与投资前景报告(2024版)
  • 遭到美国做空机构“灰熊”做空后,人工智能公司商汤科技股价暴跌
  • 异常数据检测 | Python实现孤立森林(IsolationForest)异常检测
  • 营销互动类小游戏策划与开发
  • 主机的容器化技术介绍
  • 网络基础『发展 ‖ 协议 ‖ 传输 ‖ 地址』
  • Aapche Dubbo 不安全的 Java 反序列化 (CVE-2019-17564)
  • B/S软件开发架构
  • 【docker系列】docker实战之部署SpringBoot项目
  • 【数据结构】时间和空间复杂度
  • 【Web】[GKCTF 2021]easycms
  • VM CentOS7安装ffmpeg
  • PyTorch Models
  • viple模拟器使用(四):unity模拟器中实现沿右墙迷宫算法
  • 面试送分题!“商品分类浏览”如何测试?
  • 在浏览器中直接打开PDF
  • docker集群的详解以及超详细搭建
  • 4进制思路。。。。。。。。
  • 解决ansible批量加入新IP涉及known_hosts报错的问题