当前位置: 首页 > news >正文

Spark 计算总销量

Spark 计算总销量

题目:

某电商平台存储了所有商品的销售数据,平台希望能够找到销量最好的前 N 个商品。通过分析销售记录,帮助平台决策哪些商品需要更多的推广资源。

假设你得到了一个商品销售记录的文本文件

product_id, product_name, quantity, sale_date
1, "Smartphone", 10, "2024-11-01"
2, "Laptop", 5, "2024-11-02"
3, "T-Shirt", 25, "2024-11-03"
4, "Smartwatch", 8, "2024-11-04"
5, "Headphones", 12, "2024-11-05"
1, "Smartphone", 15, "2024-11-06"
2, "Laptop", 10, "2024-11-07"
3, "T-Shirt", 10, "2024-11-08"

各字段含义:
product_id: 商品ID
product_name: 商品名称
quantity: 销售数量
sale_date: 销售日期

任务:
计算总销量:计算每个商品的总销量,输出如下。

product_id  product_name  total_sales
1           Smartphone    25
2           Laptop        15
3           T-Shirt       35
4           Smartwatch    8
5           Headphones    12

找出销量最高的前 N 个商品:根据计算出的销量,找出前 N 个销售量最多的商品,N 由用户输入。N=3时输出如下:

product_id  product_name total_sales
3           T-Shirt       35
1          Smartphone     25
2           Laptop        15

运行

  1. 在桌面创建文件buy_count.txt,输入文本内容
  2. Java代码
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.Function2;
import scala.Serializable;
import scala.Tuple2;
import java.util.Scanner;public class Test02 {/** Serializable* 标记一个类可以被序列化,* 即可以将其状态转换为字节流,* 以便进行持久化存储或在网络上传输* */static  class  Product implements Serializable{int product_id;String product_name;int quantity;@Overridepublic String toString() {return  String.format("%-10s %-20s %-10s", product_id, product_name, quantity);}}public static void main(String[] args) {// 文件路径// 获取用户的主目录并构建绝对路径String userHome = System.getProperty("user.home");String logFile = "file://" + userHome + "/Desktop/spark_test.txt";
//        String logFile = "file:///Desktop/spark_test.txt";// SparkConf 对象// setMaster("local")表示应用程序将在本地模式下运行// setAppName("SimpleApp")设置了应用程序的名称为SimpleAppSparkConf conf=new SparkConf().setMaster("local").setAppName("SimpleApp");// JavaSparkContext对象,它是与Spark交互的主要入口点。它接收前面创建的SparkConf对象作为参数JavaSparkContext sc=new JavaSparkContext(conf);// sc.textFile(logFile)加载文本文件内容// .cache()方法会将此RDD缓存起来以便后续重复使用时能更快访问JavaRDD<String> linesRDD = sc.textFile(logFile).cache();/** 按商品分组* JavaPairRDD 键值对* PairFunction用于定义将输入对象转换为键值对的逻辑* filter 方法对linesRDD中的每一行执行过滤(删除标题行)* mapToPair 会对每一行进行处理,生成键值对* 以product_name做键,Product对象做值* */JavaPairRDD<Integer, Product> productRDD = linesRDD.filter(new Function<String, Boolean>() {public Boolean call(String line) {return !line.contains("product_id");}}).mapToPair(new PairFunction<String, Integer, Product>(){@Overridepublic Tuple2<Integer, Product> call(String line) throws Exception {String[] fields = line.split(", ");Product product = new Product();product.product_id = Integer.parseInt(fields[0]);product.product_name = fields[1].replace("\"", "");product.quantity = Integer.parseInt(fields[2]);return new Tuple2<Integer, Product>(product.product_id, product);}});System.out.printf("%-10s %-20s %-10s%n", "product_id", "product_name", "total_sales");productRDD.foreach(tuple -> {Product value = tuple._2;System.out.println(value);});System.out.println("------------------------------------");/** 合并同一商品的数量* */JavaPairRDD<Integer, Product> productRDD2 = productRDD.reduceByKey(new Function2<Product, Product, Product>(){@Overridepublic Product call(Product product, Product product2) throws Exception {product2.quantity += product.quantity;return product2;}});// 按照商品id升序排序JavaPairRDD<Integer, Product> fourproductRankDescRDD = productRDD2.sortByKey(true);System.out.printf("%-10s %-20s %-10s%n", "product_id", "product_name", "total_sales");fourproductRankDescRDD.foreach(tuple -> {Product value = tuple._2;System.out.println(value);});// 将 JavaPairRDD 转换为 JavaRDD<Product>JavaRDD<Product> productRDD3 = productRDD2.values();// 按照 quantity 降序排序JavaRDD<Product> sortedByQuantityRDD = productRDD3.sortBy(product -> product.quantity, false, 1);Scanner scanner = new Scanner(System.in);System.out.print("请输入要显示的前N名商品:");int N =  scanner.nextInt();System.out.printf("%-10s %-20s %-10s%n", "product_id", "product_name", "total_sales");sortedByQuantityRDD.take(N).forEach(product -> System.out.println(product));}
}
  1. IDEA打包:https://blog.csdn.net/kelekele111/article/details/123047189
  2. 终端运行
/usr/local/spark/bin/spark-submit  ~/Desktop/Spark.jar
http://www.lryc.cn/news/497844.html

相关文章:

  • 矩阵置零
  • Ai编程cursor + sealos + devBox实现登录以及用户管理增删改查(十三)
  • 深度解读:生产环境中的日志优化与大数据处理实践20241116
  • docker 搭建gitlab,亲测可用
  • SpringBoot 分层解耦
  • opencv复习
  • flask-socketio相关总结
  • 2024-12-03OpenCV图片处理基础
  • 本地部署开源趣味艺术画板Paint Board结合内网穿透跨网络多设备在线绘画
  • iOS、android的app备案超简单的公钥、md5获取方法
  • SpringCloud 与 SpringBoot版本对应关系,以及maven,jdk
  • 23种设计模式之装饰模式
  • HTMLHTML5革命:构建现代网页的终极指南 - 2. HTMLHTML5H5的区别
  • Django之ORM表操作
  • python下几个淘宝、天猫、京东爬虫实例
  • 级联树结构TreeSelect和上级反查
  • gradle下载慢解决方案2024 /12 /1android studio (Windows环境)
  • Python+OpenCV系列:GRAY BGR HSV
  • 丢垃圾视频时间检测 -- 基于状态机的实现
  • 【QT】一个简单的串口通信小工具(QSerialPort实现)
  • 24/12/5 算法笔记<强化学习> doubleDQN,duelingDQN
  • 道可云人工智能元宇宙每日资讯|全国工商联人工智能委员会成立会议在南京举办
  • MySQL数据库(2)-检查安装与密码重置
  • C# 13 中的新增功能
  • 视频自学笔记
  • easyexcel 导出日期格式化
  • 02-开发环境搭建
  • DBeaver导入csv到数据库
  • React第十一节 组件之间通讯之发布订阅模式(自定义发布订阅器)
  • tcpreplay/tcpdump-重放网络流量/捕获、过滤和分析数据包