当前位置: 首页 > news >正文

Hadoop3教程(十三):MapReduce中的分区

文章目录

  • (96) 默认HashPartitioner分区
  • (97) 自定义分区案例
  • (98)分区数与Reduce个数的总结
  • 参考文献

(96) 默认HashPartitioner分区

分区,是Shuffle里核心的一环,不同分区的数据最终会被送进不同的ReduceTask去处理。之前的几个小节里也都讲过分区。
Hadoop里默认的分区方式是HashPartitioner分区,核心代码:

public class HashPartitioner<K, V> extends Partitioner<K, V> {public int getPartition(K key, V value, iint numReduceTasks) {return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;}
}

在HashPartitioner里,每个key分到哪个ReduceTask(可以理解成Key属于哪个分区),是根据每个key的hashCode对ReduceTask的个数取模得到的,用户是没法控制的。

这里是为什么还要& Integer.MAX_VALUE呢?

主要是为了防止溢写,通过& Integer.MAX_VALUE,将key的hash值控制在Integer.MAX_VALUE及之下。

从代码里看,在往环形缓冲区写的时候,如果识别到numReduceTasks > 1,则启用HashPartitioner分区,如果numReduceTasks = 1,那就不启用了,直接return numReduceTasks - 1

我们也可以自定义Partitioner,自定义类需要继承Partitioner类,并重写里面的getPartition()方法。

public class CustomPartitioner extendsPartitioner<Text, FlowBean>{@overridepublic int getPartition(Text key, FlowBean value, int numPartitions){//控制分区代码逻辑。。。。。。return partition;}}

然后在驱动类里,设置上写好的自定义Partitioner:

job.setPartitionerClass(CustomPartitioner.class);

最后再设置上ReduceTask的数量:

job.setNumReduceTasks(5);

如果不设置ReduceTask的数量,那分区数默认是1,直接return 0,不会启用自定义分区。

(97) 自定义分区案例

首先抛出一个需求:将一堆手机号按照归属地的省份输出到不同的文件里。

已有一个phone_data.txt文件。

所以期望的输出数据是什么样子的呢?

手机号136/137/138/139开头的分别放进4个独立的文件里,然后其他的手机号放到一个文件里。最终形成5个文件。

显而易见,这个需求的核心在于自定义分区上。

所以我们需要写一个自定义分区类,假设它叫ProvincePartitioner,我们希望它能做到以下分配:

136 分区0
137 分区1
138 分区2
139 分区3
其他 分区4

等分区类建好后,别忘记在驱动里注册上这个类,并定义好ReduceTask数量。

job.setPartitionerClass(ProvincePartitioner.class);
job.setNumReduceTasks(5);

展示一下ProvincePartitioner类的代码:

package com.atguigu.mapreduce.partitioner;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;public class ProvincePartitioner extends Partitioner<Text, FlowBean> {@Overridepublic int getPartition(Text text, FlowBean flowBean, int numPartitions) {//获取手机号前三位prePhoneString phone = text.toString();String prePhone = phone.substring(0, 3);//定义一个分区号变量partition,根据prePhone设置分区号int partition;if("136".equals(prePhone)){partition = 0;}else if("137".equals(prePhone)){partition = 1;}else if("138".equals(prePhone)){partition = 2;}else if("139".equals(prePhone)){partition = 3;}else {partition = 4;}//最后返回分区号partitionreturn partition;}
}

(98)分区数与Reduce个数的总结

思考这么一个问题,如果自定义Partitioner中定义了5个分区,但是驱动类里注册的时候,只声明了4个分区,即job.setNumReduceTask=4,那这时候代码会正常运行么?

不会,会报java.io.IOException。

至于为什么报IO异常,自然是MapTask中,在往环形缓冲器Collector里写的时候,发现没有第5个分区,写不进去当然就报IO异常。

但是,设置job.setNumReduceTask=1,代码是可以跑的,这是为什么呢?

原因其实之前提过,这是因为设置为1后,MapTask里,Collector在collect数据的时候,分区就不走我们自定义的Partitioner,而是直接return 0了,到最后Reduce阶段也只会生成一个文件。

这里是有点反直觉的,需要注意。

那我如果job.setNumReduceTask=6呢,代码还能跑吗?

可以跑,且会生成6个文件,只不过第6个文件是空的。

总结一下:

  • 当NumReduceTask > getPartition()里定义的分区数量,可以正常运行,但是相应的,会多余生成一些空的文件,浪费计算资源和存储资源;
  • 当 1 < NumReduceTask < getPartition()分区量,会报IO异常,因为少的那一部分分区的数据会无法写入;
  • 当NumReduceTask = 1时,不会调用自定义分区器,而是会将所有的数据都交付给一个ReduceTask,最后也只会生成一个文件。
  • 自定义分区类时,分区号必须从0开始,且必须是连续的,即是逐一累加的

最后一条比较重要,即必须是0/1/2/3/4/5/…这种形式,而不能是0/10/11/20这种。

2023-7-24 17:08:08 我有个小问题,就是驱动类里设置setNumReduceTask的时候,能不能设置成动态的,就是根据输入数据调整的呢?

查了一下,确实是有这种取巧的方式,比如说使用自定义的InputFormat,在读取数据的同时,获取数据量的情况,并根据这些信息动态调整ReduceTask的数量。这里就不多讲了,有兴趣可以查查。

参考文献

  1. 【尚硅谷大数据Hadoop教程,hadoop3.x搭建到集群调优,百万播放】
http://www.lryc.cn/news/196567.html

相关文章:

  • 笔记本Win10系统一键重装操作方法
  • FilterRegistrationBean能不能排除指定url
  • 【LeetCode】36. 有效的数独
  • 华为---PPP协议简介及示例配置
  • asp.net老年大学信息VS开发sqlserver数据库web结构c#编程Microsoft Visual Studio计算机毕业设计
  • 模型量化笔记--对称量化和非对称量化
  • PA2019 Terytoria
  • 内容分发网络CDN分布式部署真的可以加速吗?原理是什么?
  • 微服务docker部署实战
  • js实现拖拽功能
  • 数据库主从切换过程中Druid没法获取连接错误
  • 【iOS】Mac M1安装iPhone及iPad的app时设置问题
  • Springboot 启动报错@spring.active@解析错误
  • 【算法挨揍日记】day15——560. 和为 K 的子数组、974. 和可被 K 整除的子数组
  • 数字时代的探索与革新:Socks5代理的引领作用
  • 算法-堆/归并排序-排序链表
  • word 如何编写4x4矩阵
  • INTELlij IDEA编辑VUE项目
  • linux进程间通讯--信号量
  • VS Code连接远程Linux服务器开发c++项目
  • stable diffusion的模型选择,采样器选择,关键词
  • BI零售数据分析:以自身视角展开分析
  • Maven 使用教程(三)
  • 行秋找工作的记录
  • vue项目打包,使用externals抽离公共的第三方库
  • 九阳真经之各大厂校招
  • Go语言入门心法(五): 函数
  • gitignore文件的语法规则
  • vscode提示扩展主机在过去5分钟内意外终止了3次,解决方法
  • 【算法挨揍日记】day16——525. 连续数组、1314. 矩阵区域和