当前位置: 首页 > news >正文

测试PySpark

文章最前: 我是Octopus,这个名字来源于我的中文名--章鱼;我热爱编程、热爱算法、热爱开源。所有源码在我的个人github ;这博客是记录我学习的点点滴滴,如果您对 Python、Java、AI、算法有兴趣,可以关注我的动态,一起学习,共同进步。

这篇文章旨在帮你写出健壮的pyspark 代码。

在这里,通过它写pyspark单元测试,看这个代码通过PySpark built,下载该目录代码,查看JIRA 看板票的pyspark测试

创建PySpark应用

这边一个例子是怎么创建pyspark应用,如果你的应用已经测试,你可以跳过这一段,测试你的pyspark程序。

现在,开始测试你的spark session

from pyspark.sql import SparkSession
from pyspark.sql.functions import col# Create a SparkSession
spark = SparkSession.builder.appName("Testing PySpark Example").getOrCreate()

接下来,创建一个DataFrame

sample_data = [{"name": "John    D.", "age": 30},{"name": "Alice   G.", "age": 25},{"name": "Bob  T.", "age": 35},{"name": "Eve   A.", "age": 28}]df = spark.createDataFrame(sample_data)

现在,我们对我们的DataFrame来定义转换算子

from pyspark.sql.functions import col, regexp_replace# Remove additional spaces in name
def remove_extra_spaces(df, column_name):# Remove extra spaces from the specified columndf_transformed = df.withColumn(column_name, regexp_replace(col(column_name), "\\s+", " "))return df_transformedtransformed_df = remove_extra_spaces(df, "name")transformed_df.show()
+---+--------+
|age|    name|
+---+--------+
| 30| John D.|
| 25|Alice G.|
| 35|  Bob T.|
| 28|  Eve A.|
+---+--------+

测试你的pyspark应用

现在来测试你的pyspark转换算子。一个选择简化DataFrame测试结果,可以简化数据或者输入数据。更好的方式写测试例子,这里有一些例子怎么去测试我们的代码,这些代码是基于spark 3.5以下版本。对于这些例子做笔记是非常值得的,可以通过测试框架,不管你是使用unittest or pytest; built-in PySpark 测试是单机的,意味着他兼容测试框架和CI测试

选项1: 仅仅使用PySpark Built-in 测试方法

import pyspark.testing
from pyspark.testing.utils import assertDataFrameEqual# Example 1
df1 = spark.createDataFrame(data=[("1", 1000), ("2", 3000)], schema=["id", "amount"])
df2 = spark.createDataFrame(data=[("1", 1000), ("2", 3000)], schema=["id", "amount"])
assertDataFrameEqual(df1, df2)  # pass, DataFrames are identical
# Example 2
df1 = spark.createDataFrame(data=[("1", 0.1), ("2", 3.23)], schema=["id", "amount"])
df2 = spark.createDataFrame(data=[("1", 0.109), ("2", 3.23)], schema=["id", "amount"])
assertDataFrameEqual(df1, df2, rtol=1e-1)  # pass, DataFrames are approx equal by rtol

 您还可以简单地比较两个 DataFrame 模式:

from pyspark.testing.utils import assertSchemaEqual
from pyspark.sql.types import StructType, StructField, ArrayType, DoubleTypes1 = StructType([StructField("names", ArrayType(DoubleType(), True), True)])
s2 = StructType([StructField("names", ArrayType(DoubleType(), True), True)])assertSchemaEqual(s1, s2)  # pass, schemas are identical

选项 2:使用单元测试

对于更复杂的测试场景,您可能需要使用测试框架。

最流行的测试框架选项之一是单元测试。让我们逐步了解如何使用内置 Pythonunittest库来编写 PySpark 测试。有关该unittest库的更多信息,请参阅此处: https: //docs.python.org/3/library/unittest.html。

首先,您需要一个 Spark 会话。您可以使用包@classmethod中的装饰器unittest来负责设置和拆除 Spark 会话。

import unittestclass PySparkTestCase(unittest.TestCase):@classmethoddef setUpClass(cls):cls.spark = SparkSession.builder.appName("Testing PySpark Example").getOrCreate()@classmethoddef tearDownClass(cls):cls.spark.stop()

 现在我们来写一个unittest类。

from pyspark.testing.utils import assertDataFrameEqualclass TestTranformation(PySparkTestCase):def test_single_space(self):sample_data = [{"name": "John    D.", "age": 30},{"name": "Alice   G.", "age": 25},{"name": "Bob  T.", "age": 35},{"name": "Eve   A.", "age": 28}]# Create a Spark DataFrameoriginal_df = spark.createDataFrame(sample_data)# Apply the transformation function from beforetransformed_df = remove_extra_spaces(original_df, "name")expected_data = [{"name": "John D.", "age": 30},{"name": "Alice G.", "age": 25},{"name": "Bob T.", "age": 35},{"name": "Eve A.", "age": 28}]expected_df = spark.createDataFrame(expected_data)assertDataFrameEqual(transformed_df, expected_df)
运行时,unittest将选取名称以“test”开头的所有函数。

选项 3:使用Pytest

pytest我们还可以使用最流行的 Python 测试框架之一来编写测试。有关 的更多信息pytest,请参阅此处的文档: https: //docs.pytest.org/en/7.1.x/contents.html。

使用pytest固定装置允许我们在测试之间共享 Spark 会话,并在测试完成时将其拆除。

import pytest@pytest.fixture
def spark_fixture():spark = SparkSession.builder.appName("Testing PySpark Example").getOrCreate()yield spark

然后我们可以这样定义我们的测试:

import pytest
from pyspark.testing.utils import assertDataFrameEqualdef test_single_space(spark_fixture):sample_data = [{"name": "John    D.", "age": 30},{"name": "Alice   G.", "age": 25},{"name": "Bob  T.", "age": 35},{"name": "Eve   A.", "age": 28}]# Create a Spark DataFrameoriginal_df = spark.createDataFrame(sample_data)# Apply the transformation function from beforetransformed_df = remove_extra_spaces(original_df, "name")expected_data = [{"name": "John D.", "age": 30},{"name": "Alice G.", "age": 25},{"name": "Bob T.", "age": 35},{"name": "Eve A.", "age": 28}]expected_df = spark.createDataFrame(expected_data)assertDataFrameEqual(transformed_df, expected_df)

当您使用该pytest命令运行测试文件时,它将选取名称以“test”开头的所有函数。

把它们放在一起!

让我们在单元测试示例中一起查看所有步骤。

# pkg/etl.py
import unittestfrom pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import regexp_replace
from pyspark.testing.utils import assertDataFrameEqual# Create a SparkSession
spark = SparkSession.builder.appName("Sample PySpark ETL").getOrCreate()sample_data = [{"name": "John    D.", "age": 30},{"name": "Alice   G.", "age": 25},{"name": "Bob  T.", "age": 35},{"name": "Eve   A.", "age": 28}]df = spark.createDataFrame(sample_data)# Define DataFrame transformation function
def remove_extra_spaces(df, column_name):# Remove extra spaces from the specified column using regexp_replacedf_transformed = df.withColumn(column_name, regexp_replace(col(column_name), "\\s+", " "))return df_transformed
# pkg/test_etl.py
import unittestfrom pyspark.sql import SparkSession# Define unit test base class
class PySparkTestCase(unittest.TestCase):@classmethoddef setUpClass(cls):cls.spark = SparkSession.builder.appName("Sample PySpark ETL").getOrCreate()@classmethoddef tearDownClass(cls):cls.spark.stop()# Define unit test
class TestTranformation(PySparkTestCase):def test_single_space(self):sample_data = [{"name": "John    D.", "age": 30},{"name": "Alice   G.", "age": 25},{"name": "Bob  T.", "age": 35},{"name": "Eve   A.", "age": 28}]# Create a Spark DataFrameoriginal_df = spark.createDataFrame(sample_data)# Apply the transformation function from beforetransformed_df = remove_extra_spaces(original_df, "name")expected_data = [{"name": "John D.", "age": 30},{"name": "Alice G.", "age": 25},{"name": "Bob T.", "age": 35},{"name": "Eve A.", "age": 28}]expected_df = spark.createDataFrame(expected_data)assertDataFrameEqual(transformed_df, expected_df)
unittest.main(argv=[''], verbosity=0, exit=False)
在 1.734 秒内完成 1 次测试
<unittest.main.TestProgram 位于 0x174539db0>
http://www.lryc.cn/news/198841.html

相关文章:

  • C语言- 原子操作
  • 设置hadoop+安装java环境
  • 阿里云新加坡主机服务器选择
  • 21天打卡掌握java基础操作
  • SQL题目记录
  • Linux程序调试器——gdb的使用
  • 前端打包项目上线-nginx
  • 创龙瑞芯微RK3568参数修改(调试口波特率和rootfs文件)
  • VMware——VMware17安装WindowServer2012R2环境(图解版)
  • ModuleNotFoundError: No module named ‘torch‘
  • 采用Spring Boot框架开发的医院预约挂号系统3e3g0+vue+java
  • Jmeter性能测试(压力测试)
  • NetCore/Net8下使用Redis的分布式锁实现秒杀功能
  • openGauss学习笔记-102 openGauss 数据库管理-管理数据库安全-客户端接入之查看数据库连接数
  • lspci源码
  • CMake教程-第 8 步:添加自定义命令和生成文件
  • 快速入门:Spring Cache
  • 探索音频传输系统:数字声音的无限可能 | 百能云芯
  • 【C++】-c++的类型转换
  • 《论文阅读28》OGMM
  • 忆联分布式数据库存储解决方案,助力MySQL实现高性能、低时延
  • 网络安全内网渗透之信息收集--systeminfo查看电脑有无加域
  • MySQL高可用架构学习
  • seata的AT模式分析
  • 【算法练习Day22】 组合总和 III电话号码的字母组合
  • react-------JS对象、数组方法实际应用集合
  • AWS SAP-C02教程6--安全
  • Go学习第一章——开发环境安装以及快速入门(GoLand)
  • 大数据学习(14)-Map Join和Common Join
  • Docker安装ES7.14和Kibana7.14(无账号密码)