当前位置: 首页 > news >正文

MapReduce实战案例(3)

案例三: MR实战之TOPN(自定义GroupingComparator)

项目准备

  1. 需求+测试数据

有如下订单数据

订单id商品id成交金额
Order_0000001Pdt_01222.8
Order_0000001Pdt_0525.8
Order_0000002Pdt_03522.8
Order_0000002Pdt_04122.4
Order_0000002Pdt_05722.4
Order_0000003Pdt_01222.8

现在需要求出每一个订单中成交金额最大的一笔交易

  1. 分析

    a) 利用“订单id和成交金额”作为key,可以将map阶段读取到的所有订单数据按照id分区,按照金额排序,发送到reduce

    b) 在reduce端利用groupingcomparator将订单id相同的kv聚合成组,然后取第一个即是最大值

项目实现

a)自定义groupingcomparator

/*** @Author 千锋大数据教学团队* @Company 千锋好程序员大数据* @Description 用于控制shuffle过程中reduce端对kv对的聚合逻辑*/
public class ItemidGroupingComparator extends WritableComparator {protected ItemidGroupingComparator() {super(OrderBean.class, true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {OrderBean abean = (OrderBean) a;OrderBean bbean = (OrderBean) b;//将item_id相同的bean都视为相同,从而聚合为一组return abean.getItemid().compareTo(bbean.getItemid());}
}
复制代码

文末扫码领取福利! 

b)定义订单信息bean

/*** @Author 千锋大数据教学团队* @Company 千锋好程序员大数据* @Description 订单信息bean,实现hadoop的序列化机制*/
public class OrderBean implements WritableComparable<OrderBean>{private Text itemid;private DoubleWritable amount;public OrderBean() {}public OrderBean(Text itemid, DoubleWritable amount) {set(itemid, amount);}public void set(Text itemid, DoubleWritable amount) {this.itemid = itemid;this.amount = amount;}public Text getItemid() {return itemid;}public DoubleWritable getAmount() {return amount;}@Overridepublic int compareTo(OrderBean o) {int cmp = this.itemid.compareTo(o.getItemid());if (cmp == 0) {cmp = -this.amount.compareTo(o.getAmount());}return cmp;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(itemid.toString());out.writeDouble(amount.get());}@Overridepublic void readFields(DataInput in) throws IOException {String readUTF = in.readUTF();double readDouble = in.readDouble();this.itemid = new Text(readUTF);this.amount= new DoubleWritable(readDouble);}@Overridepublic String toString() {return itemid.toString() + "\t" + amount.get();}
}
复制代码

c) 编写MapReduce处理流程

/*** @Author 千锋大数据教学团队* @Company 千锋好程序员大数据* @Description 利用secondarysort机制输出每种item订单金额最大的记录*/public class SecondarySort {static class SecondarySortMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{OrderBean bean = new OrderBean();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String[] fields = StringUtils.split(line, "\t");bean.set(new Text(fields[0]), new DoubleWritable(Double.parseDouble(fields[1])));context.write(bean, NullWritable.get());}}static class SecondarySortReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{//在设置了groupingcomparator以后,这里收到的kv数据 就是:  <1001 87.6>,null  <1001 76.5>,null  .... //此时,reduce方法中的参数key就是上述kv组中的第一个kv的key:<1001 87.6>//要输出同一个item的所有订单中最大金额的那一个,就只要输出这个key@Overrideprotected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {context.write(key, NullWritable.get());}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(SecondarySort.class);job.setMapperClass(SecondarySortMapper.class);job.setReducerClass(SecondarySortReducer.class);job.setOutputKeyClass(OrderBean.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));//指定shuffle所使用的GroupingComparator类job.setGroupingComparatorClass(ItemidGroupingComparator.class);//指定shuffle所使用的partitioner类job.setPartitionerClass(ItemIdPartitioner.class);job.setNumReduceTasks(3);job.waitForCompletion(true);}}

 

 也可以观看视频:

千锋大数据Hadoop全新增强版-先导片

http://www.lryc.cn/news/91303.html

相关文章:

  • Socket(三)
  • 【JVM】12. 垃圾回收相关概念
  • Java 版 spring cloud 工程系统管理 工程项目管理系统源码 工程项目各模块及其功能点清单
  • 【Linux系统基础快速入门详解】Linux系统命令行介绍、命令提示符知识详解: ~/#/@等符号
  • Python 面向对象编程笔记:中级面向对象
  • JVM学习笔记(上)
  • 反爬虫技术
  • JAVA中.equals()与 ==的区别
  • 华为OD机试之羊、狼、农夫过河(Java源码)
  • C++ string的简单应用
  • Java中的阻塞队列
  • PriorityBlockingQueue无界阻塞优先级队列
  • 「HTML和CSS入门指南」p 标签详解
  • 【单目标优化算法】孔雀优化算法(Matlab代码实现)
  • chatgpt赋能python:Python同一行多个语句:如何提高你的编程效率?
  • Java反射概述
  • 《网络是怎样连接的》(一)
  • Flink on yarn任务日志怎么看
  • 二次元的登录界面
  • 2. 量化多因子数据清洗——去极值、标准化、正交化、中性化
  • 皮卡丘反射型XSS
  • 巧计口诀-软件测试的生命周期,黑盒测试设计方法
  • Android系统的Ashmem匿名共享内存系统分析(1)- Ashmem驱动
  • Redis 事务详细介绍
  • 2023-5-29第二十九天
  • 【第三方库】PHP实现创建PDF文件和编辑PDF文件
  • 线程的回收及内存演示
  • 高精度倾角传感器测量原理
  • Android 12 init流程分析
  • 【Python小技巧】Python操控Chrome浏览器实现网页打开、切换、关闭(送独家Chrome操作打包类源码、Chrome浏览器Cookie在哪里?)