当前位置: 首页 > news >正文

4、数据清洗

4、数据清洗

前面我们处理的数据实际上都是已经被处理好的规整数据,但是在大数据整个生产过程中,需要先对数据进行数据清洗,将杂乱无章的数据整理为符合后面处理要求的规整数据。

数据去重

'''
1.删除重复数据groupby().count():可以看到数据的重复情况
'''
df = spark.createDataFrame([(1, 144.5, 5.9, 33, 'M'),(2, 167.2, 5.4, 45, 'M'),(3, 124.1, 5.2, 23, 'F'),(4, 144.5, 5.9, 33, 'M'),(5, 133.2, 5.7, 54, 'F'),(3, 124.1, 5.2, 23, 'F'),(5, 129.2, 5.3, 42, 'M'),
], ['id', 'weight', 'height', 'age', 'gender'])# 查看重复记录
#无意义重复数据去重:数据中行与行完全重复
# 1.首先删除完全一样的记录
df2 = df.dropDuplicates()#有意义去重:删除除去无意义字段之外的完全重复的行数据
# 2.其次,关键字段值完全一模一样的记录(在这个例子中,是指除了id之外的列一模一样)
# 删除某些字段值完全一样的重复记录,subset参数定义这些字段
df3 = df2.dropDuplicates(subset = [c for c in df2.columns if c!='id'])
# 3.有意义的重复记录去重之后,再看某个无意义字段的值是否有重复(在这个例子中,是看id是否重复)
# 查看某一列是否有重复值
import pyspark.sql.functions as fn
df3.agg(fn.count('id').alias('id_count'),fn.countDistinct('id').alias('distinct_id_count')).collect()
# 4.对于id这种无意义的列重复,添加另外一列自增iddf3.withColumn('new_id',fn.monotonically_increasing_id()).show()

缺失值处理

'''
2.处理缺失值
2.1 对缺失值进行删除操作(行,列)
2.2 对缺失值进行填充操作(列的均值)
2.3 对缺失值对应的行或列进行标记
'''
df_miss = spark.createDataFrame([
(1, 143.5, 5.6, 28,'M', 100000),
(2, 167.2, 5.4, 45,'M', None),
(3, None , 5.2, None, None, None),
(4, 144.5, 5.9, 33, 'M', None),
(5, 133.2, 5.7, 54, 'F', None),
(6, 124.1, 5.2, None, 'F', None),
(7, 129.2, 5.3, 42, 'M', 76000),],['id', 'weight', 'height', 'age', 'gender', 'income'])# 1.计算每条记录的缺失值情况df_miss.rdd.map(lambda row:(row['id'],sum([c==None for c in row]))).collect()
[(1, 0), (2, 1), (3, 4), (4, 1), (5, 1), (6, 2), (7, 0)]# 2.计算各列的缺失情况百分比
df_miss.agg(*[(1 - (fn.count(c) / fn.count('*'))).alias(c + '_missing') for c in df_miss.columns]).show()# 3、删除缺失值过于严重的列
# 其实是先建一个DF,不要缺失值的列
df_miss_no_income = df_miss.select([
c for c in df_miss.columns if c != 'income'
])# 4、按照缺失值删除行(threshold是根据一行记录中,缺失字段的百分比的定义)
df_miss_no_income.dropna(thresh=3).show()# 5、填充缺失值,可以用fillna来填充缺失值,
# 对于bool类型、或者分类类型,可以为缺失值单独设置一个类型,missing
# 对于数值类型,可以用均值或者中位数等填充# fillna可以接收两种类型的参数:
# 一个数字、字符串,这时整个DataSet中所有的缺失值都会被填充为相同的值。
# 也可以接收一个字典{列名:值}这样# 先计算均值,并组织成一个字典
means = df_miss_no_income.agg( *[fn.mean(c).alias(c) for c in df_miss_no_income.columns if c != 'gender']).toPandas().to_dict('records')[0]
# 然后添加其它的列
means['gender'] = 'missing'df_miss_no_income.fillna(means).show()

异常值处理

'''
3、异常值处理
异常值:不属于正常的值 包含:缺失值,超过正常范围内的较大值或较小值
分位数去极值
中位数绝对偏差去极值
正态分布去极值
上述三种操作的核心都是:通过原始数据设定一个正常的范围,超过此范围的就是一个异常值
'''
df_outliers = spark.createDataFrame([
(1, 143.5, 5.3, 28),
(2, 154.2, 5.5, 45),
(3, 342.3, 5.1, 99),
(4, 144.5, 5.5, 33),
(5, 133.2, 5.4, 54),
(6, 124.1, 5.1, 21),
(7, 129.2, 5.3, 42),
], ['id', 'weight', 'height', 'age'])
# 设定范围 超出这个范围的 用边界值替换# approxQuantile方法接收三个参数:参数1,列名;参数2:想要计算的分位点,可以是一个点,也可以是一个列表(0和1之间的小数),第三个参数是能容忍的误差,如果是0,代表百分百精确计算。cols = ['weight', 'height', 'age']bounds = {}
for col in cols:quantiles = df_outliers.approxQuantile(col, [0.25, 0.75], 0.05)IQR = quantiles[1] - quantiles[0]bounds[col] = [quantiles[0] - 1.5 * IQR,quantiles[1] + 1.5 * IQR]>>> bounds
{'age': [-11.0, 93.0], 'height': [4.499999999999999, 6.1000000000000005], 'weight': [91.69999999999999, 191.7]}# 为异常值字段打标志
outliers = df_outliers.select(*['id'] + [( (df_outliers[c] < bounds[c][0]) | (df_outliers[c] > bounds[c][1]) ).alias(c + '_o') for c in cols ])
outliers.show()
#
# +---+--------+--------+-----+
# | id|weight_o|height_o|age_o|
# +---+--------+--------+-----+
# |  1|   false|   false|false|
# |  2|   false|   false|false|
# |  3|    true|   false| true|
# |  4|   false|   false|false|
# |  5|   false|   false|false|
# |  6|   false|   false|false|
# |  7|   false|   false|false|
# +---+--------+--------+-----+# 再回头看看这些异常值的值,重新和原始数据关联df_outliers = df_outliers.join(outliers, on='id')
df_outliers.filter('weight_o').select('id', 'weight').show()
# +---+------+
# | id|weight|
# +---+------+
# |  3| 342.3|
# +---+------+df_outliers.filter('age_o').select('id', 'age').show()
# +---+---+
# | id|age|
# +---+---+
# |  3| 99|
# +---+---+
http://www.lryc.cn/news/113363.html

相关文章:

  • Python-OpenCV 图像的基础操作
  • test111
  • 17. Spring 事务
  • 【C# 基础精讲】运算符和表达式
  • 【搜索】DFS连通性模型
  • 项目优化后续 ,手撸一个精简版VUE项目框架!
  • 【深度学习笔记】TensorFlow 基础
  • 面试题-springcloud中的负载均衡是如何实现的?
  • flink的ProcessWindowFunction函数的三种状态
  • day50-springboot+ajax分页
  • Win7 专业版Windows time w32time服务电脑重启后老是已停止
  • 全网最强,接口自动化测试-token登录关联实战总结(超详细)
  • OLAP ModelKit Crack,ADO.NET和IList
  • 4 三组例子,用OpenCV玩转图像-AI-python
  • 计算机网络-三种交换方式
  • 03 制作Ubuntu启动盘
  • 【JavaSE】String类中常用的字符串方法(超全)
  • Bootload U-Boot分析
  • 以公益之行,筑责任之心——2023年中创算力爱心公益助学活动
  • 【机器学习】处理样本不平衡的问题
  • Android前沿技术?Jetpack如何?
  • 为react项目添加开发/提交规范(前端工程化、eslint、prettier、husky、commitlint、stylelint)
  • 小研究 - MySQL 数据库安全加固技术的研究(一)
  • linux安装redis带图详细
  • MySql——数据库常用命令
  • 如何通过 WordPress 数据库启用插件?【进不去后台可用】
  • 芯片热处理设备 HTR-4立式4寸快速退火炉
  • 小研究 - 基于 MySQL 数据库的数据安全应用设计(一)
  • mysql转sqlite3
  • 在linux中使用 ./configure 时报错