当前位置: 首页 > news >正文

大数据课程D5——hadoop的Sink

文章作者邮箱:yugongshiye@sina.cn              地址:广东惠州

 ▲ 本章节目的

⚪ 掌握Sink的HDFS Sink;

⚪ 掌握Sink的Logger Sink;

⚪ 掌握Sink的File Roll Sink;

⚪ 掌握Sink的Null Sink;

⚪ 掌握Sink的AVRO Sink;

⚪ 掌握Sink的Custom Sink;

一、HDFS Sink

1. 概述

1. HDFS Sink将收集到的数据写到HDFS中。

2. 在往HDFS上写的时候,支持三种文件类型:文本类型,序列类型以及压缩类型。如果不指定,那么默认使用使得序列类型。

3. 在往HDFS上写数据的时候,数据的存储文件会定时的滚动,如果不指定,那么每隔30s会滚动一次,生成一个文件,那么此时会生成大量的小文件。

2. 配置属性

属性

解释

type

必须是hdfs

hdfs.path

数据在HDFS上的存储路径

hdfs.rollInterval

指定文件的滚动的间隔时间

hdfs.fileType

指定文件的存储类型:DataSteam(文本),SequenceFile(序列),CompressedStream(压缩)

3. 案例

1. 编写格式文件,添加如下内容:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = netcat

a1.sources.s1.bind = hadoop01

a1.sources.s1.port = 8090

a1.channels.c1.type = memory

# 配置HDFS Sink

# 类型必须是hdfs

a1.sinks.k1.type = hdfs

# 指定数据在HDFS上的存储路径

a1.sinks.k1.hdfs.path = hdfs://hadoop01:9000/flumedata

# 指定文件的存储类型

a1.sinks.k1.hdfs.fileType = DataStream

# 指定文件滚动的间隔时间

a1.sinks.k1.hdfs.rollInterval = 3600

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

2. 启动Flume:

../bin/flume-ng agent -n a1 -c ../conf -f hdfssink.conf -

Dflume.root.logger=INFO,console

二、Logger Sink

1. 概述

1. Logger Sink是将Flume收集到的数据打印到控制台上。

2. 在打印的时候,为了防止过多的数据将屏幕占满,所以要求body部分的数据不能超过16个字节,超过的部分不打印。

3. Logger Sink在打印的时候,对中文支持不好。

2. 配置属性

属性

解释

type

必须是logger

maxBytesToLog

指定body部分打印的字节数

三、File Roll Sink

1. 概述

1. File Roll Sink将数据写到本地磁盘上。

2. 同HDFS Sink类似,File Roll Sink在往磁盘上写的时候,也有一个滚动的间隔时间,同样是30s,因此在磁盘上同样会形成大量的小文件。

2. 配置属性

属性

解释

type

必须是file_roll

sink.directory

指定数据的存储目录

sink.rollInterval

指定文件滚动的间隔时间

3. 案例

1. 编写格式文件,添加如下内容:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = netcat

a1.sources.s1.bind = hadoop01

a1.sources.s1.port = 8090

a1.channels.c1.type = memory

# 配置File Roll Sink

# 类型必须是file_roll

a1.sinks.k1.type = file_roll

# 指定数据在磁盘上的存储目录

a1.sinks.k1.sink.directory = /home/flumedata

# 指定文件的滚动间隔时间

a1.sinks.k1.sink.rollInterval = 3600

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

2. 启动Flume:

../bin/flume-ng agent -n a1 -c ../conf -f filerollsink.conf -

Dflume.root.logger=INFO,console

四、Null Sink

1. 概述

1. Null Sink会抛弃所有接收到的数据。

2. 配置属性

属性

解释

type

必须是null

3. 案例

1. 编写格式文件,添加如下内容:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = netcat

a1.sources.s1.bind = hadoop01

a1.sources.s1.port = 8090

a1.channels.c1.type = memory

# 配置Null Sink

# 类型必须是null

a1.sinks.k1.type = null

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1f

2. 启动Flume:

../bin/flume-ng agent -n a1 -c ../conf -f nullsink.conf -

Dflume.root.logger=INFO,console

五、AVRO Sink

1. 概述

1. AVRO Sink会将数据利用AVRO序列化之后写出到指定的节点的指定端口。

2. AVRO Sink结合AVRO Source实现多级、扇入、扇出流动效果。

2. 配置属性

属性

解释

type

必须是avro

hostname

数据要发往的主机的主机名或者IP

port

数据要发往的主机的接收端口

3. 多级流动

1. 第一个节点:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = netcat

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

a1.channels.c1.type = memory

# 配置多级流动

# 类型必须是avro

a1.sinks.k1.type = avro

# 指定主机名或者IP

a1.sinks.k1.hostname = hadoop02

# 指定端口

a1.sinks.k1.port = 8090

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

2. 第二个节点:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = avro

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

a1.channels.c1.type = memory

# 配置多级流动

# 类型必须是avro

a1.sinks.k1.type = avro

# 指定主机名或者IP

a1.sinks.k1.hostname = hadoop03

# 指定端口

a1.sinks.k1.port = 8090

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

3. 第三个节点:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = avro

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

a1.channels.c1.type = memory

a1.sinks.k1.type = logger

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

4. 启动Flume,启动的时候,谁接收数据,就先启动谁:

../bin/flume-ng agent -n a1 -c ../conf -f duoji.conf -

Dflume.root.logger=INFO,console

4. 扇入流动

1. 第一个和第二个节点:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = netcat

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

a1.channels.c1.type = memory

# 配置多级流动

# 类型必须是avro

a1.sinks.k1.type = avro

# 指定主机名或者IP

a1.sinks.k1.hostname = hadoop03

# 指定端口

a1.sinks.k1.port = 8090

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

2. 第三个节点:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = avro

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

a1.channels.c1.type = memory

a1.sinks.k1.type = logger

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

3. 启动Flume:

../bin/flume-ng agent -n a1 -c ../conf -f shanru.conf -

Dflume.root.logger=INFO,console

5. 扇出流动

1. 第一个节点:

a1.sources = s1

a1.channels = c1 c2

a1.sinks = k1 k2

a1.sources.s1.type = netcat

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

a1.channels.c1.type = memory

a1.channels.c2.type = memory

a1.sinks.k1.type = avro

a1.sinks.k1.hostname = hadoop02

a1.sinks.k1.port = 8090

a1.sinks.k2.type = avro

a1.sinks.k2.hostname = hadoop03

a1.sinks.k2.port = 8090

a1.sources.s1.channels = c1 c2

a1.sinks.k1.channel = c1

a1.sinks.k2.channel = c2

2. 第二个和第三个节点::

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = avro

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

a1.channels.c1.type = memory

a1.sinks.k1.type = logger

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

3. 启动Flume:

../bin/flume-ng agent -n a1 -c ../conf -f shanchu.conf -

Dflume.root.logger=INFO,console

六、Custom Sink

1. 概述

1. 定义一个类实现Sink接口,考虑到需要获取配置属性,所以同样需要实现Configurable接口。

2. 不同于自定义Source,自定Sink需要考虑事务问题。

2. 事务

1. Source收集到数据之后,会通过doPut操作将树放到队列PutList(本质上是一个阻塞式队列)中。

2. PutList会试图将数据推送到Channel中。如果PutList成功将数据放到了Channel中,那么执行doCommit操作;反之执行doRollback操作。

3. Channel有了数据之后,会将数据通过doTake操作推送到TakeList中。

4. TakeList会将数据推送给Sink,如果Sink写出成功,那么执行doCommit;反之执行doRollvack。

3. 自定义Sink步骤

1. 构建Maven工程,导入对应的POM依赖。

2. 定义一个类继承AbstractSink,实现Sink接口和Configurable接口,覆盖configure,start,process和stop方法。

3. 完成之后打成jar包放到Flume安装目录的lib目录下。

4. 编写格式文件:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = netcat

a1.sources.s1.bind = hadoop01

a1.sources.s1.port = 8090

a1.channels.c1.type = memory

# 配置自定义Sink

# 类型必须是类的全路径名

a1.sinks.k1.type = cn.tedu.flume.sink.AuthSink

# 指定文件的存储路径

a1.sinks.k1.path = /home/flumedata

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

5. 启动Flume:

../bin/flume-ng agent -n a1 -c ../conf -f authsink.conf -

Dflume.root.logger=INFO,console

http://www.lryc.cn/news/99880.html

相关文章:

  • 【数据结构】27.移除元素
  • 机器学习分布式框架ray运行xgboost实例
  • C++设计模式笔记
  • 简单聊聊创新与创造力
  • 使用TensorFlow训练深度学习模型实战(上)
  • 【Spring】什么是Bean的生命周期及作用域,什么是Spring的执行流程?
  • 立创EDA学习
  • 清风学习笔记—层次分析法—matlab对判断矩阵的一致性检验
  • 大众安徽内推
  • Meta “地平线世界”移动端应用即将上线,手机快乐元宇宙?
  • 更省更快更安全的云服务器,一站式集中管理,随时随地远程——站斧云桌面
  • 出现 Try run Maven import with -U flag (force update snapshots) 的解决方法
  • python多线程
  • Spring Framework 提供缓存管理器Caffeine
  • ZQC的游戏 题解
  • 24考研数据结构-第一章 绪论
  • Gitlab 备份与恢复
  • 数据库—用户权限管理(三十三)
  • C语言【怎么定义变量?】
  • vue中使用vab-magnifier实现放大镜效果
  • 无涯教程-jQuery - Highlight方法函数
  • 【bar堆叠图形绘制】
  • ORACLE数据库灾难恢复
  • base和正则备份
  • ArcGIS Engine 与 Visual Studio版本对照表
  • JPA连接达梦数据库导致auto-ddl失效问题解决
  • 【MATLAB第60期】【更新中】基于MATLAB的ARMAX具有外生回归因子的移动平均自回归模型
  • Vue 常用指令 v-on 自定义参数,事件修饰符
  • 重要通知|关于JumpServer开源堡垒机V2版本产品生命周期的相关说明
  • 下载快 kaggle output