91-基于Spark的空气质量数据分析可视化系统
基于Spark的空气质量数据分析可视化系统设计与实现
项目概述
本项目是一个基于Apache Spark的大数据分析和可视化系统,专门用于空气质量数据的采集、分析、预测和可视化展示。系统采用分布式计算架构,结合机器学习算法,实现了对全国12个主要城市空气质量数据的全面分析和预测功能。
项目背景与意义
随着城市化进程的加快和工业化的快速发展,空气质量问题日益成为公众关注的焦点。传统的空气质量监测方式存在数据分散、分析效率低、可视化效果差等问题。本项目旨在构建一个完整的大数据分析和可视化平台,为空气质量监测提供科学、高效的技术解决方案。
项目目标
- 数据采集自动化:实现多城市空气质量数据的自动采集和更新
- 大数据分析:利用Spark分布式计算能力,处理大规模空气质量数据
- 智能预测:基于机器学习算法,预测空气质量变化趋势
- 可视化展示:提供直观、交互式的数据可视化界面
- 系统集成:构建完整的数据科学流程,从采集到展示
项目特色
- 技术先进性:采用最新的Spark 3.x版本和机器学习技术
- 架构完整性:涵盖数据采集、存储、分析、预测、可视化的完整流程
- 扩展性强:支持新城市、新指标的快速接入
- 用户友好:提供直观的Web界面和丰富的交互功能
项目演示
项目演示视频
91-基于Spark的空气质量数据分析预测系统的设计与实现
技术架构
核心技术栈
后端框架
- Django 3.1.14 - 成熟的Python Web应用框架,提供完整的MVT架构
- Apache Spark 3.x - 分布式大数据处理引擎,支持内存计算和流处理
- PySpark - Spark的Python API接口,提供DataFrame和SQL操作
数据库系统
- MySQL 8.0 - 主数据库,存储分析结果、用户数据和系统配置
- Apache Hive - 数据仓库,存储原始空气质量数据,支持SQL查询
- SQLite - 本地开发数据库,轻量级,便于开发和测试
数据科学与机器学习
- scikit-learn 1.3.2 - 机器学习算法库,提供分类、回归、聚类等算法
- pandas 1.4.3 - 数据处理和分析库,提供DataFrame操作
- numpy 1.23.1 - 数值计算库,提供高效的数组操作
- matplotlib 3.5.2 - 数据可视化库,支持多种图表类型
数据采集技术
- requests 2.31.0 - HTTP请求库,支持GET/POST请求和会话管理
- BeautifulSoup4 4.12.3 - 网页解析库,支持HTML/XML解析
- lxml 4.9.3 - XML/HTML解析器,提供高性能的解析能力
前端技术
- Bootstrap 4.x - 响应式CSS框架,提供移动优先的设计
- ECharts 5.x - 数据可视化图表库,支持多种交互式图表
- jQuery 3.x - JavaScript库,简化DOM操作和AJAX请求
- Material Design Icons - 现代化图标系统,提供丰富的图标资源
自然语言处理
- jieba 0.42.1 - 中文分词库,支持精确模式和全模式分词
- wordcloud 1.8.2.2 - 词云生成库,支持自定义形状和颜色
- snownlp 0.12.3 - 中文自然语言处理库,提供情感分析等功能
大数据处理
- PyHive 0.7.0 - Hive连接器,支持Python连接Hive数据库
- thrift 0.21.0 - 跨语言RPC框架,用于Hive连接
- mysqlclient 2.2.4 - MySQL数据库驱动,提供高性能的数据库连接
系统架构设计
整体架构图
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ 数据采集层 │ │ 数据存储层 │ │ 数据处理层 │
│ │ │ │ │ │
│ • 网络爬虫 │───▶│ • MySQL │───▶│ • Apache Spark │
│ • 数据清洗 │ │ • Hive │ │ • PySpark │
│ • 实时更新 │ │ • SQLite │ │ • 分布式计算 │
│ • 数据验证 │ │ • 数据备份 │ │ • 内存计算 │
└─────────────────┘ └─────────────────┘ └─────────────────┘│
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ 机器学习层 │◀───│ 数据分析层 │◀───│ 数据展示层 │
│ │ │ │ │ │
│ • 线性回归 │ │ • 统计分析 │ │ • Web界面 │
│ • 模型训练 │ │ • 趋势分析 │ │ • 图表可视化 │
│ • 预测算法 │ │ • 关联分析 │ │ • 交互式展示 │
│ • 模型评估 │ │ • 聚类分析 │ │ • 响应式设计 │
└─────────────────┘ └─────────────────┘ └─────────────────┘
数据流向图
数据源网站 ──→ 爬虫程序 ──→ 数据清洗 ──→ Hive数据仓库│ │ │ │▼ ▼ ▼ ▼原始数据 结构化数据 清洗后数据 存储数据│ │ │ │▼ ▼ ▼ ▼
Spark分析 ──→ 分析结果 ──→ MySQL存储 ──→ Web展示│ │ │ │▼ ▼ ▼ ▼
机器学习 ──→ 预测模型 ──→ 预测结果 ──→ 可视化图表
技术架构特点
1. 分层架构设计
- 表现层:Django Web框架,提供用户界面和交互
- 业务逻辑层:Spark分布式计算,处理大规模数据分析
- 数据访问层:多数据库支持,实现数据的高效存储和查询
- 基础设施层:Hadoop生态系统,提供分布式存储和计算能力
2. 微服务架构思想
- 数据采集服务:独立的爬虫服务,支持多城市并行采集
- 数据分析服务:Spark集群服务,提供分布式计算能力
- 机器学习服务:独立的模型训练和预测服务
- Web展示服务:Django应用服务,提供用户界面
3. 数据流设计
- 实时数据流:支持实时数据采集和处理
- 批量数据流:支持大规模历史数据的批量分析
- 流式处理:支持数据流的实时处理和响应
核心功能模块
1. 数据采集模块
功能特点
- 多城市支持:支持12个主要城市的数据采集:北京、天津、上海、重庆、广州、深圳、杭州、成都、沈阳、南京、长沙、南昌
- 历史数据获取:自动爬取历史空气质量数据,支持指定年份和月份的数据采集
- 完整指标采集:采集完整的空气质量指标:AQI、PM2.5、PM10、SO2、NO2、CO、O3
- 数据质量控制:实现数据去重、清洗和格式标准化,确保数据质量
- 反爬虫策略:采用随机延时、User-Agent轮换等策略,避免被目标网站封禁
- 错误处理机制:完善的异常处理和重试机制,确保数据采集的稳定性
技术实现细节
1. 爬虫架构设计
class AqiSpider:def __init__(self, cityname, realname):self.cityname = citynameself.realname = realname# 配置请求头,模拟真实浏览器self.headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36","Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8","Accept-Language": "zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3","Accept-Encoding": "gzip, deflate","Connection": "keep-alive","Upgrade-Insecure-Requests": "1"}# 配置CSV文件写入器self.f = open(f'data.csv', 'a', encoding='utf-8-sig', newline='')self.writer = csv.DictWriter(self.f, fieldnames=['city', 'date', 'airQuality', 'AQI', 'rank', 'PM2.5', 'PM10', 'So2', 'No2', 'Co', 'O3'])
2. 数据解析策略
def parse_response(self, response):soup = BeautifulSoup(response, 'html.parser')tr = soup.find_all('tr')for j in tr[1:]: # 跳过表头td = j.find_all('td')if len(td) < 10: # 数据完整性检查continue# 数据提取和清洗Date = td[0].get_text().strip()Quality_level = td[1].get_text().strip()AQI = td[2].get_text().strip()AQI_rank = td[3].get_text().strip()PM25 = td[4].get_text().strip()PM10 = td[5].get_text().strip()SO2 = td[6].get_text().strip()NO2 = td[7].get_text().strip()CO = td[8].get_text().strip()O3 = td[9].get_text().strip()# 数据验证和转换data_dict = self.validate_and_convert_data({'city': self.realname,'date': Date,'airQuality': Quality_level,'AQI': AQI,'rank': AQI_rank,'PM2.5': PM25,'PM10': PM10,'So2': SO2,'No2': NO2,'Co': CO,'O3': O3,})if data_dict: # 只保存有效数据self.save_data(data_dict)
3. 数据验证和清洗
def validate_and_convert_data(self, data_dict):"""数据验证和类型转换"""try:# 验证日期格式if not self.is_valid_date(data_dict['date']):return None# 转换数值类型numeric_fields = ['AQI', 'PM2.5', 'PM10', 'So2', 'No2', 'Co', 'O3']for field in numeric_fields:value = data_dict[field]if value and value != '—':try:data_dict[field] = float(value)except ValueError:data_dict[field] = 0.0else:data_dict[field] = 0.0# 验证AQI范围if data_dict['AQI'] < 0 or data_dict['AQI'] > 500:return Nonereturn data_dictexcept Exception as e:print(f"数据验证失败: {e}")return Nonedef is_valid_date(self, date_str):"""验证日期格式"""try:from datetime import datetimedatetime.strptime(date_str, '%Y-%m-%d')return Trueexcept ValueError:return False
4. 错误处理和重试机制
def send_request_with_retry(self, year, month, max_retries=3):"""带重试机制的请求发送"""for attempt in range(max_retries):try:url = f"http://www.tianqihoubao.com/aqi/{self.cityname}-{year}{month:02d}.html"response = requests.get(url, headers=self.headers, timeout=60)if response.status_code == 200:return self.parse_response(response.text)else:print(f"请求失败,状态码: {response.status_code}")except requests.exceptions.RequestException as e:print(f"请求异常 (尝试 {attempt + 1}/{max_retries}): {e}")if attempt < max_retries - 1:time.sleep(2 ** attempt) # 指数退避else:print(f"请求失败,已重试{max_retries}次")except Exception as e:print(f"未知错误: {e}")break
技术实现
import csv
import time
import requests
from bs4 import BeautifulSoupclass AqiSpider:def __init__(self, cityname, realname):self.cityname = citynameself.realname = realnameself.headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36"}self.f = open(f'data.csv', 'a', encoding='utf-8-sig', newline='')self.writer = csv.DictWriter(self.f, fieldnames=['city', 'date', 'airQuality', 'AQI', 'rank', 'PM2.5', 'PM10', 'So2', 'No2', 'Co', 'O3'])def send_request(self, year, month):url = f"http://www.tianqihoubao.com/aqi/{self.cityname}-{year}{month:02d}.html"response = requests.get(url, headers=self.headers, timeout=60)time.sleep(2) # 避免请求过于频繁print(f"响应状态码:{response.status_code}")return self.parse_response(response.text)def parse_response(self, response):soup = BeautifulSoup(response, 'html.parser')tr = soup.find_all('tr')for j in tr[1:]: # 跳过表头td = j.find_all('td')Date = td[0].get_text().strip() # 日期Quality_level = td[1].get_text().strip() # 空气质量等级AQI = td[2].get_text().strip()AQI_rank = td[3].get_text().strip()PM25 = td[4].get_text().strip()PM10 = td[5].get_text().strip()SO2 = td[6].get_text().strip()NO2 = td[7].get_text().strip()CO = td[8].get_text().strip()O3 = td[9].get_text().strip()data_dict = {'city': self.realname,'date': Date,'airQuality': Quality_level,'AQI': AQI,'rank': AQI_rank,'PM2.5': PM25,'PM10': PM10,'So2': SO2,'No2': NO2,'Co': CO,'O3': O3,}self.save_data(data_dict)def save_data(self, data_dict):self.writer.writerow(data_dict)def run(self):for month in range(1, 6):print(f"正在爬取2025年{month}月的数据")self.send_request(2025, month)if __name__ == '__main__':cityList = ['beijing', 'tianjin', 'shanghai', 'chongqing', 'guangzhou', 'shenzhen', 'hangzhou', 'chengdu', 'shenyang', 'nanjing','changsha', 'nanchang']nameList = ['北京', '天津', '上海', '重庆', '广州', '深圳', '杭州', '成都', '沈阳', '南京', '长沙', '南昌']city_dict = dict(zip(cityList, nameList))for k, v in city_dict.items():AS = AqiSpider(k, v)AS.run()
2. Spark大数据分析模块
分析维度与业务价值
1. 城市平均AQI分析
- 业务价值:识别空气质量最佳和最差的城市,为城市环境治理提供数据支撑
- 分析指标:计算各城市平均空气质量指数,进行城市排名
- 应用场景:城市环境评估、政策制定参考、公众健康指导
2. 六项污染物分析
- 业务价值:全面了解各污染物的分布特征和贡献度
- 分析指标:PM、PM10、SO2、NO2、CO、O3的统计分析和相关性研究
- 应用场景:污染物来源分析、治理效果评估、健康风险评估
3. 时间序列分析
- 业务价值:发现空气质量的时间变化规律和季节性特征
- 分析指标:按年月分析AQI的最大值、最小值变化趋势
- 应用场景:季节性污染预测、治理措施效果评估、公众出行指导
4. 污染物分布分析
- 业务价值:了解污染物浓度的分布特征和超标情况
- 分析指标:统计不同浓度区间的污染物分布情况
- 应用场景:污染等级评估、预警阈值设定、治理目标制定
技术实现架构
1. Spark集群配置
# Spark会话配置
spark = SparkSession.builder.appName("sparkSQL").master("local[*]").\config("spark.sql.shuffle.partitions", 2). \config("spark.sql.warehouse.dir", "hdfs://node1:8020/user/hive/warehouse"). \config("hive.metastore.uris", "thrift://node1:9083"). \config("spark.sql.adaptive.enabled", "true"). \config("spark.sql.adaptive.coalescePartitions.enabled", "true"). \config("spark.sql.adaptive.skewJoin.enabled", "true"). \enableHiveSupport().\getOrCreate()# 性能优化配置
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
2. 数据预处理和清洗
def preprocess_air_data(spark):"""数据预处理和清洗"""# 读取原始数据airdata = spark.read.table("airdata")# 数据类型转换airdata = airdata.withColumn("date", airdata["date"].cast("date"))airdata = airdata.withColumn("AQI", airdata["AQI"].cast("double"))airdata = airdata.withColumn("PM", airdata["PM"].cast("double"))airdata = airdata.withColumn("PM10", airdata["PM10"].cast("double"))airdata = airdata.withColumn("So2", airdata["So2"].cast("double"))airdata = airdata.withColumn("No2", airdata["No2"].cast("double"))airdata = airdata.withColumn("Co", airdata["Co"].cast("double"))airdata = airdata.withColumn("O3", airdata["O3"].cast("double"))# 数据清洗:处理缺失值和异常值airdata = airdata.na.fill(0, subset=["AQI", "PM", "PM10", "So2", "No2", "Co", "O3"])# 异常值处理:AQI范围检查airdata = airdata.filter((col("AQI") >= 0) & (col("AQI") <= 500))return airdata
3. 高级分析功能
def advanced_analysis(airdata):"""高级分析功能"""# 1. 空气质量等级分析airdata = airdata.withColumn("air_quality_level",when(col("AQI") <= 50, "优").when(col("AQI") <= 100, "良").when(col("AQI") <= 150, "轻度污染").when(col("AQI") <= 200, "中度污染").when(col("AQI") <= 300, "重度污染").otherwise("严重污染"))# 2. 污染物相关性分析correlation_analysis = airdata.select(corr("AQI", "PM").alias("AQI_PM_corr"),corr("AQI", "PM10").alias("AQI_PM10_corr"),corr("AQI", "So2").alias("AQI_So2_corr"),corr("AQI", "No2").alias("AQI_No2_corr"),corr("AQI", "Co").alias("AQI_Co_corr"),corr("AQI", "O3").alias("AQI_O3_corr"))# 3. 季节性分析seasonal_analysis = airdata.groupby(year("date").alias("year"),month("date").alias("month")).agg(avg("AQI").alias("avg_aqi"),stddev("AQI").alias("std_aqi"),count("*").alias("data_count")).orderBy("year", "month")# 4. 城市聚类分析city_clustering = airdata.groupby("city").agg(avg("AQI").alias("avg_aqi"),avg("PM").alias("avg_pm"),avg("PM10").alias("avg_pm10"),avg("So2").alias("avg_so2"),avg("No2").alias("avg_no2"),avg("Co").alias("avg_co"),avg("O3").alias("avg_o3"))return correlation_analysis, seasonal_analysis, city_clustering
4. 性能优化策略
def optimize_spark_performance(spark):"""Spark性能优化配置"""# 1. 内存优化spark.conf.set("spark.sql.adaptive.enabled", "true")spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")# 2. 缓存策略spark.conf.set("spark.sql.crossJoin.enabled", "true")spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true")# 3. 并行度优化spark.conf.set("spark.sql.shuffle.partitions", "200")spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128m")# 4. 数据倾斜处理spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256m")
5. 结果存储和导出
def save_analysis_results(results, spark):"""保存分析结果到不同存储系统"""# 1. 保存到MySQLfor table_name, data in results.items():data.write.mode("overwrite"). \format("jdbc"). \option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&charset=utf8"). \option("dbtable", table_name). \option("user", "root"). \option("password", "root"). \option("encoding", "utf-8"). \save()# 2. 保存到Hivefor table_name, data in results.items():data.write.mode("overwrite").saveAsTable(table_name, "parquet")# 3. 导出为CSV文件for table_name, data in results.items():data.toPandas().to_csv(f"results/{table_name}.csv", index=False, encoding='utf-8-sig')
Spark SQL实现示例
#coding:utf8
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import count, mean, col, sum, when, year, month, max, min, avg
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatTypeif __name__ == '__main__':# 构建Spark会话spark = SparkSession.builder.appName("sparkSQL").master("local[*]").\config("spark.sql.shuffle.partitions", 2). \config("spark.sql.warehouse.dir", "hdfs://node1:8020/user/hive/warehouse"). \config("hive.metastore.uris", "thrift://node1:9083"). \enableHiveSupport().\getOrCreate()sc = spark.sparkContext# 读取数据airdata = spark.read.table("airdata")# 需求一:城市平均AQI分析result1 = airdata.groupby("city")\.agg(mean("AQI").alias("avg_AQI"))\.orderBy("avg_AQI", ascending=False)# 需求二:六项污染物分析result2 = airdata.groupby("city") \.agg(mean("PM").alias("avg_PM"),mean("PM10").alias("avg_PM10"),mean("So2").alias("avg_So2"),mean("No2").alias("avg_No2"),mean("Co").alias("avg_Co"),mean("O3").alias("avg_O3"))# 需求三:年度空气质量趋势分析airdata = airdata.withColumn("date", airdata["date"].cast("date"))result3 = airdata.groupby("city", year("date").alias("year"), month("date").alias("month"))\.agg(max("AQI").alias("max_AQI"),min("AQI").alias("min_AQI"))# 需求四:月度PM污染物分析result4 = airdata.groupby("city", year("date").alias("year"), month("date").alias("month")) \.agg(avg("PM").alias("max_PM"),avg("PM10").alias("min_PM10"))# 需求五:优质空气天数统计result5 = airdata.groupby("city", year("date").alias("year"), month("date").alias("month"))\.agg(count(when(airdata["AQI"] < 50, True)).alias("greatAirCount"))# 需求六:污染物最大值分析result6 = airdata.groupby("city")\.agg(max("So2").alias("max_So2"),max("No2").alias("max_No2"))# 需求七:CO浓度分级统计airdata = airdata.withColumn("Co_category",when((col("Co") >= 0) & (col("Co") < 0.25), '0-0.25').when((col("Co") >= 0.25) & (col("Co") < 0.5), '0.25-0.5').when((col("Co") >= 0.5) & (col("Co") < 0.75), '0.5-0.75').when((col("Co") >= 0.75) & (col("Co") < 1.0), '0.75-1').otherwise("1以上"))result7 = airdata.groupby("Co_category").agg(count('*').alias('Co_count'))# 需求八:O3浓度分级统计airdata = airdata.withColumn("O3_category",when((col("O3") >= 0) & (col("O3") < 25), '0-25').when((col("O3") >= 0.25) & (col("O3") < 50), '25-50').when((col("O3") >= 50) & (col("O3") < 75), '50-75').when((col("O3") >= 75) & (col("O3") < 100), '75-100').otherwise("100以上"))result8 = airdata.groupby("O3_category").agg(count('*').alias('O3_count'))# 保存结果到MySQLresult1.write.mode("overwrite"). \format("jdbc"). \option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&charset=utf8"). \option("dbtable", "avgCityAqi"). \option("user", "root"). \option("password", "root"). \option("encoding", "utf-8"). \save()
3. 机器学习预测模块
预测模型设计与业务价值
1. 模型选择与理论基础
- 线性回归模型:基于污染物浓度与AQI的线性关系,适合快速预测
- 理论基础:AQI计算公式中各项污染物都有对应的权重系数
- 业务价值:为环境监测和公众健康提供实时预测服务
- 应用场景:空气质量预警、出行建议、健康防护指导
2. 特征工程与数据预处理
- 特征选择:PM2.5、SO2、NO2、O3作为主要预测特征
- 数据清洗:处理缺失值、异常值和数据标准化
- 特征重要性:通过模型分析各污染物对AQI的贡献度
- 数据质量:确保训练数据的准确性和完整性
3. 模型评估与优化
- 评估指标:R²、MAE、RMSE等指标评估模型性能
- 交叉验证:使用K折交叉验证确保模型泛化能力
- 超参数调优:通过网格搜索优化模型参数
- 模型监控:实时监控模型性能,及时更新模型
技术实现细节
1. 数据预处理与特征工程
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.linear_model import LinearRegression, Ridge, Lasso
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.ensemble import RandomForestRegressor
import joblibclass AirQualityPredictor:def __init__(self):self.scaler = StandardScaler()self.model = Noneself.feature_names = ['PM2_5', 'SO2', 'NO2', 'O3']def preprocess_data(self, data):"""数据预处理和特征工程"""# 1. 数据清洗data = data.dropna(subset=['AQI', 'PM2_5', 'SO2', 'NO2', 'O3'])# 2. 异常值处理data = data[(data['AQI'] >= 0) & (data['AQI'] <= 500)]data = data[(data['PM2_5'] >= 0) & (data['PM2_5'] <= 500)]data = data[(data['SO2'] >= 0) & (data['SO2'] <= 1000)]data = data[(data['NO2'] >= 0) & (data['NO2'] <= 400)]data = data[(data['O3'] >= 0) & (data['O3'] <= 300)]# 3. 特征工程# 添加时间特征data['date'] = pd.to_datetime(data['date'])data['year'] = data['date'].dt.yeardata['month'] = data['date'].dt.monthdata['day'] = data['date'].dt.daydata['day_of_week'] = data['date'].dt.dayofweek# 添加空气质量等级特征level_map = {'优': 1, '良': 2, '轻度污染': 3, '中度污染': 4, '重度污染': 5, '严重污染': 6}data['level_numeric'] = data['level'].map(level_map).fillna(0)# 添加污染物交互特征data['PM_SO2_ratio'] = data['PM2_5'] / (data['SO2'] + 1)data['NO2_O3_ratio'] = data['NO2'] / (data['O3'] + 1)return datadef feature_selection(self, data):"""特征选择"""# 基础特征base_features = ['PM2_5', 'SO2', 'NO2', 'O3']# 时间特征time_features = ['year', 'month', 'day', 'day_of_week']# 交互特征interaction_features = ['PM_SO2_ratio', 'NO2_O3_ratio']# 组合所有特征all_features = base_features + time_features + interaction_featuresreturn data[all_features], all_features
2. 模型训练与优化
def train_model(self, X_train, y_train):"""模型训练与优化"""# 1. 数据标准化X_train_scaled = self.scaler.fit_transform(X_train)# 2. 基础线性回归lr_model = LinearRegression()lr_model.fit(X_train_scaled, y_train)# 3. 正则化模型ridge_model = Ridge(alpha=1.0)lasso_model = Lasso(alpha=0.1)# 4. 随机森林模型rf_model = RandomForestRegressor(n_estimators=100, random_state=42)# 5. 模型评估models = {'LinearRegression': lr_model,'Ridge': ridge_model,'Lasso': lasso_model,'RandomForest': rf_model}best_model = Nonebest_score = -float('inf')for name, model in models.items():# 交叉验证cv_scores = cross_val_score(model, X_train_scaled, y_train, cv=5, scoring='r2')avg_score = cv_scores.mean()print(f"{name}: CV R² Score = {avg_score:.4f} (+/- {cv_scores.std() * 2:.4f})")if avg_score > best_score:best_score = avg_scorebest_model = model# 训练最佳模型best_model.fit(X_train_scaled, y_train)self.model = best_modelreturn best_model, best_scoredef hyperparameter_tuning(self, X_train, y_train):"""超参数调优"""# 定义参数网格param_grid = {'alpha': [0.001, 0.01, 0.1, 1.0, 10.0],'max_iter': [1000, 2000, 3000]}# 网格搜索grid_search = GridSearchCV(Ridge(),param_grid,cv=5,scoring='r2',n_jobs=-1)X_train_scaled = self.scaler.transform(X_train)grid_search.fit(X_train_scaled, y_train)print(f"最佳参数: {grid_search.best_params_}")print(f"最佳得分: {grid_search.best_score_:.4f}")return grid_search.best_estimator_
3. 模型评估与预测
def evaluate_model(self, X_test, y_test):"""模型评估"""X_test_scaled = self.scaler.transform(X_test)y_pred = self.model.predict(X_test_scaled)# 计算评估指标mse = mean_squared_error(y_test, y_pred)mae = mean_absolute_error(y_test, y_pred)r2 = r2_score(y_test, y_pred)rmse = np.sqrt(mse)print(f"模型评估结果:")print(f"R² Score: {r2:.4f}")print(f"MAE: {mae:.4f}")print(f"RMSE: {rmse:.4f}")print(f"MSE: {mse:.4f}")# 特征重要性分析if hasattr(self.model, 'feature_importances_'):feature_importance = pd.DataFrame({'feature': self.feature_names,'importance': self.model.feature_importances_}).sort_values('importance', ascending=False)print(f"\n特征重要性:")print(feature_importance)return {'r2': r2,'mae': mae,'rmse': rmse,'mse': mse,'predictions': y_pred}def predict_aqi(self, pm25, so2, no2, o3):"""实时AQI预测"""# 构建输入数据input_data = np.array([[pm25, so2, no2, o3]])# 数据标准化input_scaled = self.scaler.transform(input_data)# 预测prediction = self.model.predict(input_scaled)[0]# 预测结果解释aqi_level = self.get_aqi_level(prediction)return {'predicted_aqi': round(prediction, 2),'aqi_level': aqi_level,'confidence': self.get_prediction_confidence(input_scaled)}def get_aqi_level(self, aqi):"""获取AQI等级"""if aqi <= 50:return "优"elif aqi <= 100:return "良"elif aqi <= 150:return "轻度污染"elif aqi <= 200:return "中度污染"elif aqi <= 300:return "重度污染"else:return "严重污染"def get_prediction_confidence(self, input_data):"""获取预测置信度"""# 基于模型的不确定性估计if hasattr(self.model, 'predict_proba'):confidence = np.max(self.model.predict_proba(input_data))else:# 对于回归模型,使用预测值的合理性作为置信度prediction = self.model.predict(input_data)[0]confidence = max(0, min(1, 1 - abs(prediction - 100) / 100))return round(confidence, 3)
4. 模型持久化与部署
def save_model(self, model_path):"""保存模型"""model_data = {'model': self.model,'scaler': self.scaler,'feature_names': self.feature_names}joblib.dump(model_data, model_path)print(f"模型已保存到: {model_path}")def load_model(self, model_path):"""加载模型"""model_data = joblib.load(model_path)self.model = model_data['model']self.scaler = model_data['scaler']self.feature_names = model_data['feature_names']print(f"模型已从 {model_path} 加载")def batch_predict(self, data_batch):"""批量预测"""predictions = []for data in data_batch:pred = self.predict_aqi(data['PM2_5'], data['SO2'], data['NO2'], data['O3'])predictions.append(pred)return predictions
模型实现
import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from utils.query import querysdef getData():# 从数据库获取数据data = querys('select * from airData', [], 'select')print("数据库返回的列数:", len(data[0]) if data else 0)print("第一行数据:", data[0] if data else None)# 转换为DataFrame,根据实际数据调整列名products = pd.DataFrame(data, columns=['city', 'date', 'level', 'AQI', 'PM2_5', 'PM10', 'SO2', 'NO2', 'CO', 'O3', 'predict', 'extra'])# 处理空气质量等级,将文本转换为数值level_map = {'优': 1, '良': 2, '轻度污染': 3, '中度污染': 4, '重度污染': 5, '严重污染': 6}products['level'] = products['level'].map(level_map).fillna(0).astype('int')# 确保数值列为数值类型numeric_columns = ['AQI', 'PM2_5', 'PM10', 'SO2', 'NO2', 'CO', 'O3']for col in numeric_columns:products[col] = pd.to_numeric(products[col], errors='coerce').fillna(0).astype('float')return productsdef model_train(data):# 特征选择:PM2.5、SO2、NO2、O3x_train, x_test, y_train, y_test = train_test_split(data[["PM2_5", "SO2", "NO2", "O3"]], data['AQI'],test_size=0.25, random_state=1)model = LinearRegression()model.fit(x_train, y_train)return modeldef model_predict(model, data):return model.predict(data)def pred(model, *args):# 将输入参数转换为numpy数组data = np.array([args]).reshape(1, -1)pred = model.predict(data)return predif __name__ == '__main__':trainData = getData()model = model_train(trainData)print(pred(model, 10, 2, 18, 30))
4. 数据可视化模块
图表类型
- 年度空气质量分析图表:展示年度AQI变化趋势和季节性规律
- 月度分析图表:按月展示空气质量变化,识别污染高峰期
- 气体分析图表:六项污染物的对比分析,展示污染物相关性
- 城市分布图表:各城市空气质量对比,识别区域差异
- 数据词云图:空气质量相关词汇的可视化展示
图表数据处理
def getIndexData(defaultCity):avgCityAqiList = list(getavgCityAqi())avgCitySixList = list(getavgCitySix())realSixList = querys('select * from avgCitySix where city = %s', [defaultCity], 'select')[0]yLine = list(realSixList[1:])yLine = [round(x, 1) for x in yLine]xLine = ['PM2.5', 'PM10', 'So2', 'No2', 'Co', 'O3']xBar = [x[0] for x in avgCityAqiList]yBar = [round(x[1], 0) for x in avgCityAqiList]return xBar, yBar, xLine, yLinedef getYearChartData(city):maxCityAqiList = querys('select * from maxCityAqi where city = %s', [city], 'select')y1DataM = [x[3] if x[3] is not None else 0 for x in maxCityAqiList]y1DataN = [x[4] if x[4] is not None else 0 for x in maxCityAqiList]xData = []for i in range(1, 13):xData.append(str(i) + '月')avgCityPMList = querys('select * from avgCityPM where city = %s', [city], 'select')y2Data2 = [round(x[3], 1) if x[3] is not None else 0 for x in avgCityPMList]y2Data10 = [round(x[4], 1) if x[4] is not None else 0 for x in avgCityPMList]return xData, y1DataM, y1DataN, y2Data2, y2Data10def getAirMonthData(city, month):airDataList = querys('select * from airData where city = %s and SUBSTRING(date,6,2) = %s', [city, month], 'select')# 按日期排序,确保日期格式正确airDataList = sorted(airDataList, key=lambda x: int(x[1].split('-')[2]))y1Data = [x[3] if x[3] is not None else 0 for x in airDataList]y2Data = [int(x[4]) if x[4] is not None else 0 for x in airDataList]dateList = [x[1] for x in airDataList]xData = []for date in dateList:year, month, day = date.split('-')xData.append(day)greatAirList = querys('select * from greatAir where city = %s', [city], 'select')funnelData = []for i in greatAirList:funnelData.append({'name': str(i[2]) + '月','value': i[3] if i[3] is not None else 0})return xData, y1Data, y2Data, funnelData
ECharts实现示例
// 年度AQI趋势图
var option = {title: { text: '年度空气质量趋势分析' },tooltip: { trigger: 'axis' },legend: {},toolbox: {show: true,feature: {dataZoom: { yAxisIndex: 'none' },dataView: { readOnly: false },magicType: { type: ['line', 'bar'] },restore: {},saveAsImage: {}}},xAxis: { type: 'category', boundaryGap: false,data: {{ xData | safe }}},yAxis: { type: 'value',axisLabel: { formatter: '{value} ' }},series: [{name: 'So2',type: 'line',data: {{ y1Data }},markPoint: {data: [{ type: 'max', name: 'Max' },{ type: 'min', name: 'Min' }]},markLine: {data: [{ type: 'average', name: 'Avg' }]}},{name: 'No2',type: 'line',data: {{ y2Data }},markPoint: {data: [{ name: '周最低', value: -2, xAxis: 1, yAxis: -1.5 }]},markLine: {data: [{ type: 'average', name: 'Avg' },[{symbol: 'none',x: '90%',yAxis: 'max'},{symbol: 'circle',label: {position: 'start',formatter: 'Max'},type: 'max',name: '最高点'}]]}}]
};
5. Web用户界面模块
界面功能
- 用户注册登录系统
- 个人信息管理
- 数据总览表格展示
- 交互式日期选择器
- 响应式设计,支持多设备访问
数据模型设计
from django.db import modelsclass User(models.Model):id = models.AutoField("id", primary_key=True)username = models.CharField("用户名", max_length=255, default='')password = models.CharField("密码", max_length=255, default='')creteTime = models.DateField("创建时间", auto_now_add=True)class Meta:db_table = 'user'
数据库查询工具
from pyhive import hive
import time
from thrift.Thrift import TApplicationExceptiondef get_connection():try:return hive.Connection(host='node1', port='10000', username='hadoop')except Exception as e:print(f"连接Hive失败: {str(e)}")return Nonedef querys(sql, params, type='no_select', max_retries=3):retry_count = 0while retry_count < max_retries:try:conn = get_connection()if not conn:raise Exception("无法建立Hive连接")cursor = conn.cursor()params = tuple(params)cursor.execute(sql, params)if type != 'no_select':data_list = cursor.fetchall()conn.commit()return data_listelse:conn.commit()return '数据库语句执行成功'except TApplicationException as e:print(f"查询执行失败 (尝试 {retry_count + 1}/{max_retries}): {str(e)}")retry_count += 1if retry_count == max_retries:raise Exception(f"查询执行失败,已重试{max_retries}次: {str(e)}")time.sleep(2) # 等待2秒后重试except Exception as e:print(f"发生错误: {str(e)}")raisefinally:try:if 'cursor' in locals():cursor.close()if 'conn' in locals():conn.close()except:pass
Django视图实现
def index(request):uname = request.session.get('username')userInfo = User.objects.get(username=uname)airdataList = list(getairdata())dateList = list(set([x[1] for x in airdataList]))# 提取年月日数据yearList = []monthList = []dayList = []for date in dateList:year, month, day = date.split('-')yearList.append(year)monthList.append(month)dayList.append(day)yearList = sorted(set(yearList))monthList = sorted(set(monthList))dayList = sorted(set(dayList))defaultYear = '2023'defaultMonth = '01'defaultDay = '01'cityList = list(set(x[0] for x in airdataList))if request.method == 'POST':defaultYear = request.POST.get('defaultYear')defaultMonth = request.POST.get('defaultMonth')defaultDay = request.POST.get('defaultDay')defaultCity = request.session.get('defaultCity')currentDate = defaultYear + '-' + defaultMonth + '-' + defaultDay# 获取图表数据xBar, yBar, xLine, yLine = getIndexData(defaultCity)currentData = querys('select * from airdata where city = %s and date = %s',[defaultCity, currentDate], 'select')[0]currentData = convert_none_to_null(currentData)return render(request, 'index.html', {'userInfo': userInfo,'yearList': yearList,'monthList': monthList,'dayList': dayList,'defaultYear': defaultYear,'defaultMonth': defaultMonth,'defaultDay': defaultDay,'cityList': cityList,'defaultCity': defaultCity,'currentData': currentData,'xBar': convert_none_to_null(xBar),'yBar': convert_none_to_null(yBar),'xLine': convert_none_to_null(xLine),'yLine': convert_none_to_null(yLine)})
系统特色功能
1. 分布式数据处理
- 利用Spark的分布式计算能力,处理大规模空气质量数据
- 支持实时数据流处理和批量数据分析
- 实现数据并行计算,提高处理效率
2. 智能预测分析
- 基于历史数据训练机器学习模型
- 支持多维度特征工程和模型优化
- 提供预测结果的可信度评估
3. 多维度数据可视化
- 支持多种图表类型:折线图、柱状图、散点图、热力图等
- 实现交互式数据探索和动态筛选
- 提供图表导出和分享功能
4. 实时数据更新
- 定时任务自动更新空气质量数据
- 支持增量数据同步和全量数据更新
- 确保数据的时效性和准确性
项目部署与运维
环境配置
# 安装Python依赖
pip install -r requirements.txt# 配置数据库
mysql -u root -p < design_91_airdata.sql# 启动Spark集群
spark-submit --master local[*] sparks/sparkAna.py# 启动Django应用
python manage.py runserver
系统要求
- Python 3.8+
- Apache Spark 3.0+
- MySQL 8.0+
- 内存:8GB+
- 存储:100GB+
性能优化
1. 数据处理优化
- 使用Spark的缓存机制减少重复计算
- 优化SQL查询,使用索引提高查询效率
- 实现数据分区,提高并行处理能力
2. 前端性能优化
- 使用CDN加速静态资源加载
- 实现图表懒加载和分页显示
- 优化JavaScript代码,减少页面加载时间
3. 数据库优化
- 建立合适的索引提高查询速度
- 使用连接池管理数据库连接
- 定期清理历史数据,保持数据库性能
项目成果
1. 技术成果
- 成功构建了完整的大数据分析和可视化系统
- 实现了从数据采集到预测分析的完整流程
- 建立了可扩展的系统架构,支持功能扩展
2. 业务价值
- 为空气质量监测提供了科学的数据分析工具
- 支持多城市空气质量对比和趋势分析
- 为环境决策提供了数据支撑
3. 用户体验
- 提供了直观友好的用户界面
- 支持多维度数据探索和可视化
- 实现了响应式设计,适配多种设备
未来展望
1. 功能扩展
- 增加更多城市的空气质量数据
- 支持更多污染物指标的监测
- 集成更多机器学习算法
2. 技术升级
- 升级到Spark 3.x版本,利用新特性
- 引入深度学习模型提高预测精度
- 实现实时数据流处理
3. 用户体验优化
- 开发移动端APP
- 增加个性化推荐功能
- 提供API接口供第三方调用
总结
本项目成功构建了一个基于Spark的空气质量数据分析可视化系统,实现了从数据采集、存储、分析、预测到可视化的完整数据科学流程。系统采用现代化的技术栈,具备良好的可扩展性和维护性,为空气质量监测和分析提供了强有力的技术支撑。
通过本项目的开发,不仅掌握了大数据处理、机器学习、Web开发等多项技术,更重要的是理解了如何将复杂的技术栈整合成一个完整的业务系统,为用户提供有价值的服务。
联系方式:如需源码等情况,欢迎主页联系
许可证:MIT License