当前位置: 首页 > news >正文

Hadoop中MapReduce过程中Shuffle过程实现自定义排序

文章目录

  • Hadoop中MapReduce过程中Shuffle过程实现自定义排序
    • 一、引言
    • 二、实现WritableComparable接口
      • 1、自定义Key类
    • 三、使用Job.setSortComparatorClass方法
      • 2、设置自定义排序器
      • 3、自定义排序器类
    • 四、使用示例
    • 五、总结

Hadoop中MapReduce过程中Shuffle过程实现自定义排序

在这里插入图片描述

一、引言

MapReduce框架中的Shuffle过程是连接Map阶段和Reduce阶段的桥梁,负责将Map任务的输出结果按照key进行分组和排序,并将相同key的数据传递给对应的Reduce任务进行处理。Shuffle过程的性能直接影响到整个MapReduce作业的执行效率。在默认情况下,Hadoop使用TotalOrderPartitioner进行排序,但有时我们需要根据特定的业务逻辑进行自定义排序。本文将介绍两种方法来实现自定义排序:实现WritableComparable接口和使用Job.setSortComparatorClass方法。下面是详细的步骤和代码示例。

二、实现WritableComparable接口

1、自定义Key类

首先,我们需要定义一个类并实现WritableComparable接口,该接口要求实现compareTo方法,用于定义排序逻辑。

package mr;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class Employee implements WritableComparable<Employee> {private int empno;private String ename;private String job;private int mgr;private String hiredate;private int sal;private int comm;private int deptno;@Overridepublic String toString(){return "Employee[empno="+empno+",ename="+ename+",sal="+sal+",deptno="+deptno+"]";}@Overridepublic int compareTo(Employee o) {// 多个列的排序:select * from emp order by deptno,sal;// 首先按照deptno排序if(this.deptno > o.getDeptno()){return 1;}else if(this.deptno < o.getDeptno()){return -1;}// 如果deptno相等,按照sal排序if(this.sal >= o.getSal()){return 1;}else{return -1;}}@Overridepublic void write(DataOutput output) throws IOException {// 序列化output.writeInt(this.empno);output.writeUTF(this.ename);output.writeUTF(this.job);output.writeInt(this.mgr);output.writeUTF(this.hiredate);output.writeInt(this.sal);output.writeInt(this.comm);output.writeInt(this.deptno);}@Overridepublic void readFields(DataInput input) throws IOException {// 反序列化this.empno = input.readInt();this.ename = input.readUTF();this.job = input.readUTF();this.mgr = input.readInt();this.hiredate = input.readUTF();this.sal = input.readInt();this.comm = input.readInt();this.deptno = input.readInt();}
}

三、使用Job.setSortComparatorClass方法

2、设置自定义排序器

除了实现WritableComparable接口外,我们还可以使用Job.setSortComparatorClass方法来设置自定义排序器。这种方法允许我们在不修改Key类的情况下实现自定义排序。

package mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class CustomSort {public static class Map extends Mapper<Object, Text, Employee, IntWritable> {private static Employee emp = new Employee();private static IntWritable one = new IntWritable(1);@Overrideprotected void map(Object key, Text value, Context context) throws IOException, InterruptedException {String[] line = value.toString().split("\t");emp.setEmpno(Integer.parseInt(line[0]));emp.setEname(line[1]);emp.setJob(line[2]);emp.setMgr(Integer.parseInt(line[3]));emp.setHiredate(line[4]);emp.setSal(Integer.parseInt(line[5]));emp.setComm(Integer.parseInt(line[6]));emp.setDeptno(Integer.parseInt(line[7]));context.write(emp, one);}}public static class Reduce extends Reducer<Employee, IntWritable, Employee, IntWritable> {@Overrideprotected void reduce(Employee key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {for (IntWritable val : values) {context.write(key, val);}}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf, "CustomSort");job.setJarByClass(CustomSort.class);job.setMapperClass(Map.class);job.setReducerClass(Reduce.class);job.setOutputKeyClass(Employee.class);job.setOutputValueClass(IntWritable.class);// 设置自定义排序器job.setSortComparatorClass(EmployeeComparator.class);Path in = new Path("hdfs://localhost:9000/mr/in/customsort");Path out = new Path("hdfs://localhost:9000/mr/out/customsort");FileInputFormat.addInputPath(job, in);FileOutputFormat.setOutputPath(job, out);System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

3、自定义排序器类

package mr;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;public class EmployeeComparator extends WritableComparator {protected EmployeeComparator() {super(Employee.class, true);}@Overridepublic int compare(WritableComparable w1, WritableComparable w2) {Employee e1 = (Employee) w1;Employee e2 = (Employee) w2;// 首先按照deptno排序int deptCompare = Integer.compare(e1.getDeptno(), e2.getDeptno());if (deptCompare != 0) {return deptCompare;}// 如果deptno相等,按照sal排序return Integer.compare(e1.getSal(), e2.getSal());}
}

四、使用示例

下面是一个简单的MapReduce示例,展示了Shuffle过程在实际应用中的使用。这个示例中,我们使用了自定义的Employee类作为Key,并设置了自定义的排序器EmployeeComparator

五、总结

通过实现WritableComparable接口和使用Job.setSortComparatorClass方法,我们可以在Hadoop MapReduce过程中实现自定义排序。这两种方法提供了灵活的排序机制,允许我们根据不同的业务需求对数据进行排序处理,从而提高数据处理的效率和准确性。


版权声明:本博客内容为原创,转载请保留原文链接及作者信息。

参考文章

  • Hadoop之mapreduce数据排序案例(详细代码)
  • Java Job.setSortComparatorClass方法代码示例
http://www.lryc.cn/news/510940.html

相关文章:

  • 数位dp-acwing
  • 智慧园区小程序开发制作功能介绍
  • STM32高级 物联网之Wi-Fi通讯
  • LLM预训练recipe — 摘要版
  • 波动理论、传输线和S参数网络
  • nginx-1.23.2版本RPM包发布
  • 如何用WPS AI提高工作效率
  • LabVIEW应用在工业车间
  • Elasticsearch:normalizer
  • 动态规划子序列问题系列一>等差序列划分II
  • 48页PPT|2024智慧仓储解决方案解读
  • 低代码开源项目Joget的研究——Joget8社区版安装部署
  • upload-labs关卡记录15
  • 1.使用 Couchbase 数仓和 Temporal(一个分布式任务调度和编排框架)实现每 5 分钟的增量任务
  • matrix-breakout-2-morpheus
  • 农历节日倒计时:基于Python的公历与农历日期转换及节日查询小程序
  • 【RabbitMQ的死信队列】
  • 掌握软件工程基础:知识点全面解析【chap02】
  • 公路边坡安全监测中智能化+定制化+全面守护的应用方案
  • 闲谭Scala(3)--使用IDEA开发Scala
  • Go语言反射从入门到进阶
  • 【基于rust-wasm的前端页面转pdf组件和示例】
  • ARM64 Windows 10 IoT工控主板运行x86程序效率测试
  • 开放世界目标检测 Grounding DINO
  • easegen将教材批量生成可控ppt课件方案设计
  • 2002 - Can‘t connect to server on ‘192.168.1.XX‘ (36)
  • 【虚拟机网络拓扑记录】
  • 【单片机通讯协议】—— 常用的UART/I2C/SPI等通讯协议的基本原理与时序分析
  • Vue3 核心语法
  • LLaMA-Factory GLM4-9B-CHAT LoRA 指令微调实战