当前位置: 首页 > news >正文

91-基于Spark的空气质量数据分析可视化系统

基于Spark的空气质量数据分析可视化系统设计与实现

项目概述

本项目是一个基于Apache Spark的大数据分析和可视化系统,专门用于空气质量数据的采集、分析、预测和可视化展示。系统采用分布式计算架构,结合机器学习算法,实现了对全国12个主要城市空气质量数据的全面分析和预测功能。

项目背景与意义

随着城市化进程的加快和工业化的快速发展,空气质量问题日益成为公众关注的焦点。传统的空气质量监测方式存在数据分散、分析效率低、可视化效果差等问题。本项目旨在构建一个完整的大数据分析和可视化平台,为空气质量监测提供科学、高效的技术解决方案。

项目目标

  1. 数据采集自动化:实现多城市空气质量数据的自动采集和更新
  2. 大数据分析:利用Spark分布式计算能力,处理大规模空气质量数据
  3. 智能预测:基于机器学习算法,预测空气质量变化趋势
  4. 可视化展示:提供直观、交互式的数据可视化界面
  5. 系统集成:构建完整的数据科学流程,从采集到展示

项目特色

  • 技术先进性:采用最新的Spark 3.x版本和机器学习技术
  • 架构完整性:涵盖数据采集、存储、分析、预测、可视化的完整流程
  • 扩展性强:支持新城市、新指标的快速接入
  • 用户友好:提供直观的Web界面和丰富的交互功能

项目演示

在这里插入图片描述

首页

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

项目演示视频

91-基于Spark的空气质量数据分析预测系统的设计与实现

技术架构

核心技术栈

后端框架
  • Django 3.1.14 - 成熟的Python Web应用框架,提供完整的MVT架构
  • Apache Spark 3.x - 分布式大数据处理引擎,支持内存计算和流处理
  • PySpark - Spark的Python API接口,提供DataFrame和SQL操作
数据库系统
  • MySQL 8.0 - 主数据库,存储分析结果、用户数据和系统配置
  • Apache Hive - 数据仓库,存储原始空气质量数据,支持SQL查询
  • SQLite - 本地开发数据库,轻量级,便于开发和测试
数据科学与机器学习
  • scikit-learn 1.3.2 - 机器学习算法库,提供分类、回归、聚类等算法
  • pandas 1.4.3 - 数据处理和分析库,提供DataFrame操作
  • numpy 1.23.1 - 数值计算库,提供高效的数组操作
  • matplotlib 3.5.2 - 数据可视化库,支持多种图表类型
数据采集技术
  • requests 2.31.0 - HTTP请求库,支持GET/POST请求和会话管理
  • BeautifulSoup4 4.12.3 - 网页解析库,支持HTML/XML解析
  • lxml 4.9.3 - XML/HTML解析器,提供高性能的解析能力
前端技术
  • Bootstrap 4.x - 响应式CSS框架,提供移动优先的设计
  • ECharts 5.x - 数据可视化图表库,支持多种交互式图表
  • jQuery 3.x - JavaScript库,简化DOM操作和AJAX请求
  • Material Design Icons - 现代化图标系统,提供丰富的图标资源
自然语言处理
  • jieba 0.42.1 - 中文分词库,支持精确模式和全模式分词
  • wordcloud 1.8.2.2 - 词云生成库,支持自定义形状和颜色
  • snownlp 0.12.3 - 中文自然语言处理库,提供情感分析等功能
大数据处理
  • PyHive 0.7.0 - Hive连接器,支持Python连接Hive数据库
  • thrift 0.21.0 - 跨语言RPC框架,用于Hive连接
  • mysqlclient 2.2.4 - MySQL数据库驱动,提供高性能的数据库连接

系统架构设计

整体架构图
┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   数据采集层     │    │   数据存储层     │    │   数据处理层     │
│                 │    │                 │    │                 │
│ • 网络爬虫      │───▶│ • MySQL         │───▶│ • Apache Spark  │
│ • 数据清洗      │    │ • Hive          │    │ • PySpark       │
│ • 实时更新      │    │ • SQLite        │    │ • 分布式计算     │
│ • 数据验证      │    │ • 数据备份      │    │ • 内存计算       │
└─────────────────┘    └─────────────────┘    └─────────────────┘│
┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   机器学习层     │◀───│   数据分析层     │◀───│   数据展示层     │
│                 │    │                 │    │                 │
│ • 线性回归      │    │ • 统计分析      │    │ • Web界面       │
│ • 模型训练      │    │ • 趋势分析      │    │ • 图表可视化     │
│ • 预测算法      │    │ • 关联分析      │    │ • 交互式展示     │
│ • 模型评估      │    │ • 聚类分析      │    │ • 响应式设计     │
└─────────────────┘    └─────────────────┘    └─────────────────┘
数据流向图
数据源网站 ──→ 爬虫程序 ──→ 数据清洗 ──→ Hive数据仓库│              │              │              │▼              ▼              ▼              ▼原始数据      结构化数据      清洗后数据      存储数据│              │              │              │▼              ▼              ▼              ▼
Spark分析 ──→ 分析结果 ──→ MySQL存储 ──→ Web展示│              │              │              │▼              ▼              ▼              ▼
机器学习 ──→ 预测模型 ──→ 预测结果 ──→ 可视化图表
技术架构特点

1. 分层架构设计

  • 表现层:Django Web框架,提供用户界面和交互
  • 业务逻辑层:Spark分布式计算,处理大规模数据分析
  • 数据访问层:多数据库支持,实现数据的高效存储和查询
  • 基础设施层:Hadoop生态系统,提供分布式存储和计算能力

2. 微服务架构思想

  • 数据采集服务:独立的爬虫服务,支持多城市并行采集
  • 数据分析服务:Spark集群服务,提供分布式计算能力
  • 机器学习服务:独立的模型训练和预测服务
  • Web展示服务:Django应用服务,提供用户界面

3. 数据流设计

  • 实时数据流:支持实时数据采集和处理
  • 批量数据流:支持大规模历史数据的批量分析
  • 流式处理:支持数据流的实时处理和响应

核心功能模块

1. 数据采集模块

功能特点
  • 多城市支持:支持12个主要城市的数据采集:北京、天津、上海、重庆、广州、深圳、杭州、成都、沈阳、南京、长沙、南昌
  • 历史数据获取:自动爬取历史空气质量数据,支持指定年份和月份的数据采集
  • 完整指标采集:采集完整的空气质量指标:AQI、PM2.5、PM10、SO2、NO2、CO、O3
  • 数据质量控制:实现数据去重、清洗和格式标准化,确保数据质量
  • 反爬虫策略:采用随机延时、User-Agent轮换等策略,避免被目标网站封禁
  • 错误处理机制:完善的异常处理和重试机制,确保数据采集的稳定性
技术实现细节

1. 爬虫架构设计

class AqiSpider:def __init__(self, cityname, realname):self.cityname = citynameself.realname = realname# 配置请求头,模拟真实浏览器self.headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36","Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8","Accept-Language": "zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3","Accept-Encoding": "gzip, deflate","Connection": "keep-alive","Upgrade-Insecure-Requests": "1"}# 配置CSV文件写入器self.f = open(f'data.csv', 'a', encoding='utf-8-sig', newline='')self.writer = csv.DictWriter(self.f, fieldnames=['city', 'date', 'airQuality', 'AQI', 'rank', 'PM2.5', 'PM10', 'So2', 'No2', 'Co', 'O3'])

2. 数据解析策略

def parse_response(self, response):soup = BeautifulSoup(response, 'html.parser')tr = soup.find_all('tr')for j in tr[1:]:  # 跳过表头td = j.find_all('td')if len(td) < 10:  # 数据完整性检查continue# 数据提取和清洗Date = td[0].get_text().strip()Quality_level = td[1].get_text().strip()AQI = td[2].get_text().strip()AQI_rank = td[3].get_text().strip()PM25 = td[4].get_text().strip()PM10 = td[5].get_text().strip()SO2 = td[6].get_text().strip()NO2 = td[7].get_text().strip()CO = td[8].get_text().strip()O3 = td[9].get_text().strip()# 数据验证和转换data_dict = self.validate_and_convert_data({'city': self.realname,'date': Date,'airQuality': Quality_level,'AQI': AQI,'rank': AQI_rank,'PM2.5': PM25,'PM10': PM10,'So2': SO2,'No2': NO2,'Co': CO,'O3': O3,})if data_dict:  # 只保存有效数据self.save_data(data_dict)

3. 数据验证和清洗

def validate_and_convert_data(self, data_dict):"""数据验证和类型转换"""try:# 验证日期格式if not self.is_valid_date(data_dict['date']):return None# 转换数值类型numeric_fields = ['AQI', 'PM2.5', 'PM10', 'So2', 'No2', 'Co', 'O3']for field in numeric_fields:value = data_dict[field]if value and value != '—':try:data_dict[field] = float(value)except ValueError:data_dict[field] = 0.0else:data_dict[field] = 0.0# 验证AQI范围if data_dict['AQI'] < 0 or data_dict['AQI'] > 500:return Nonereturn data_dictexcept Exception as e:print(f"数据验证失败: {e}")return Nonedef is_valid_date(self, date_str):"""验证日期格式"""try:from datetime import datetimedatetime.strptime(date_str, '%Y-%m-%d')return Trueexcept ValueError:return False

4. 错误处理和重试机制

def send_request_with_retry(self, year, month, max_retries=3):"""带重试机制的请求发送"""for attempt in range(max_retries):try:url = f"http://www.tianqihoubao.com/aqi/{self.cityname}-{year}{month:02d}.html"response = requests.get(url, headers=self.headers, timeout=60)if response.status_code == 200:return self.parse_response(response.text)else:print(f"请求失败,状态码: {response.status_code}")except requests.exceptions.RequestException as e:print(f"请求异常 (尝试 {attempt + 1}/{max_retries}): {e}")if attempt < max_retries - 1:time.sleep(2 ** attempt)  # 指数退避else:print(f"请求失败,已重试{max_retries}次")except Exception as e:print(f"未知错误: {e}")break

技术实现

import csv
import time
import requests
from bs4 import BeautifulSoupclass AqiSpider:def __init__(self, cityname, realname):self.cityname = citynameself.realname = realnameself.headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36"}self.f = open(f'data.csv', 'a', encoding='utf-8-sig', newline='')self.writer = csv.DictWriter(self.f, fieldnames=['city', 'date', 'airQuality', 'AQI', 'rank', 'PM2.5', 'PM10', 'So2', 'No2', 'Co', 'O3'])def send_request(self, year, month):url = f"http://www.tianqihoubao.com/aqi/{self.cityname}-{year}{month:02d}.html"response = requests.get(url, headers=self.headers, timeout=60)time.sleep(2)  # 避免请求过于频繁print(f"响应状态码:{response.status_code}")return self.parse_response(response.text)def parse_response(self, response):soup = BeautifulSoup(response, 'html.parser')tr = soup.find_all('tr')for j in tr[1:]:  # 跳过表头td = j.find_all('td')Date = td[0].get_text().strip()  # 日期Quality_level = td[1].get_text().strip()  # 空气质量等级AQI = td[2].get_text().strip()AQI_rank = td[3].get_text().strip()PM25 = td[4].get_text().strip()PM10 = td[5].get_text().strip()SO2 = td[6].get_text().strip()NO2 = td[7].get_text().strip()CO = td[8].get_text().strip()O3 = td[9].get_text().strip()data_dict = {'city': self.realname,'date': Date,'airQuality': Quality_level,'AQI': AQI,'rank': AQI_rank,'PM2.5': PM25,'PM10': PM10,'So2': SO2,'No2': NO2,'Co': CO,'O3': O3,}self.save_data(data_dict)def save_data(self, data_dict):self.writer.writerow(data_dict)def run(self):for month in range(1, 6):print(f"正在爬取2025年{month}月的数据")self.send_request(2025, month)if __name__ == '__main__':cityList = ['beijing', 'tianjin', 'shanghai', 'chongqing', 'guangzhou', 'shenzhen', 'hangzhou', 'chengdu', 'shenyang', 'nanjing','changsha', 'nanchang']nameList = ['北京', '天津', '上海', '重庆', '广州', '深圳', '杭州', '成都', '沈阳', '南京', '长沙', '南昌']city_dict = dict(zip(cityList, nameList))for k, v in city_dict.items():AS = AqiSpider(k, v)AS.run()

2. Spark大数据分析模块

分析维度与业务价值

1. 城市平均AQI分析

  • 业务价值:识别空气质量最佳和最差的城市,为城市环境治理提供数据支撑
  • 分析指标:计算各城市平均空气质量指数,进行城市排名
  • 应用场景:城市环境评估、政策制定参考、公众健康指导

2. 六项污染物分析

  • 业务价值:全面了解各污染物的分布特征和贡献度
  • 分析指标:PM、PM10、SO2、NO2、CO、O3的统计分析和相关性研究
  • 应用场景:污染物来源分析、治理效果评估、健康风险评估

3. 时间序列分析

  • 业务价值:发现空气质量的时间变化规律和季节性特征
  • 分析指标:按年月分析AQI的最大值、最小值变化趋势
  • 应用场景:季节性污染预测、治理措施效果评估、公众出行指导

4. 污染物分布分析

  • 业务价值:了解污染物浓度的分布特征和超标情况
  • 分析指标:统计不同浓度区间的污染物分布情况
  • 应用场景:污染等级评估、预警阈值设定、治理目标制定
技术实现架构

1. Spark集群配置

# Spark会话配置
spark = SparkSession.builder.appName("sparkSQL").master("local[*]").\config("spark.sql.shuffle.partitions", 2). \config("spark.sql.warehouse.dir", "hdfs://node1:8020/user/hive/warehouse"). \config("hive.metastore.uris", "thrift://node1:9083"). \config("spark.sql.adaptive.enabled", "true"). \config("spark.sql.adaptive.coalescePartitions.enabled", "true"). \config("spark.sql.adaptive.skewJoin.enabled", "true"). \enableHiveSupport().\getOrCreate()# 性能优化配置
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

2. 数据预处理和清洗

def preprocess_air_data(spark):"""数据预处理和清洗"""# 读取原始数据airdata = spark.read.table("airdata")# 数据类型转换airdata = airdata.withColumn("date", airdata["date"].cast("date"))airdata = airdata.withColumn("AQI", airdata["AQI"].cast("double"))airdata = airdata.withColumn("PM", airdata["PM"].cast("double"))airdata = airdata.withColumn("PM10", airdata["PM10"].cast("double"))airdata = airdata.withColumn("So2", airdata["So2"].cast("double"))airdata = airdata.withColumn("No2", airdata["No2"].cast("double"))airdata = airdata.withColumn("Co", airdata["Co"].cast("double"))airdata = airdata.withColumn("O3", airdata["O3"].cast("double"))# 数据清洗:处理缺失值和异常值airdata = airdata.na.fill(0, subset=["AQI", "PM", "PM10", "So2", "No2", "Co", "O3"])# 异常值处理:AQI范围检查airdata = airdata.filter((col("AQI") >= 0) & (col("AQI") <= 500))return airdata

3. 高级分析功能

def advanced_analysis(airdata):"""高级分析功能"""# 1. 空气质量等级分析airdata = airdata.withColumn("air_quality_level",when(col("AQI") <= 50, "优").when(col("AQI") <= 100, "良").when(col("AQI") <= 150, "轻度污染").when(col("AQI") <= 200, "中度污染").when(col("AQI") <= 300, "重度污染").otherwise("严重污染"))# 2. 污染物相关性分析correlation_analysis = airdata.select(corr("AQI", "PM").alias("AQI_PM_corr"),corr("AQI", "PM10").alias("AQI_PM10_corr"),corr("AQI", "So2").alias("AQI_So2_corr"),corr("AQI", "No2").alias("AQI_No2_corr"),corr("AQI", "Co").alias("AQI_Co_corr"),corr("AQI", "O3").alias("AQI_O3_corr"))# 3. 季节性分析seasonal_analysis = airdata.groupby(year("date").alias("year"),month("date").alias("month")).agg(avg("AQI").alias("avg_aqi"),stddev("AQI").alias("std_aqi"),count("*").alias("data_count")).orderBy("year", "month")# 4. 城市聚类分析city_clustering = airdata.groupby("city").agg(avg("AQI").alias("avg_aqi"),avg("PM").alias("avg_pm"),avg("PM10").alias("avg_pm10"),avg("So2").alias("avg_so2"),avg("No2").alias("avg_no2"),avg("Co").alias("avg_co"),avg("O3").alias("avg_o3"))return correlation_analysis, seasonal_analysis, city_clustering

4. 性能优化策略

def optimize_spark_performance(spark):"""Spark性能优化配置"""# 1. 内存优化spark.conf.set("spark.sql.adaptive.enabled", "true")spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")# 2. 缓存策略spark.conf.set("spark.sql.crossJoin.enabled", "true")spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true")# 3. 并行度优化spark.conf.set("spark.sql.shuffle.partitions", "200")spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128m")# 4. 数据倾斜处理spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256m")

5. 结果存储和导出

def save_analysis_results(results, spark):"""保存分析结果到不同存储系统"""# 1. 保存到MySQLfor table_name, data in results.items():data.write.mode("overwrite"). \format("jdbc"). \option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&charset=utf8"). \option("dbtable", table_name). \option("user", "root"). \option("password", "root"). \option("encoding", "utf-8"). \save()# 2. 保存到Hivefor table_name, data in results.items():data.write.mode("overwrite").saveAsTable(table_name, "parquet")# 3. 导出为CSV文件for table_name, data in results.items():data.toPandas().to_csv(f"results/{table_name}.csv", index=False, encoding='utf-8-sig')

Spark SQL实现示例

#coding:utf8
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import count, mean, col, sum, when, year, month, max, min, avg
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatTypeif __name__ == '__main__':# 构建Spark会话spark = SparkSession.builder.appName("sparkSQL").master("local[*]").\config("spark.sql.shuffle.partitions", 2). \config("spark.sql.warehouse.dir", "hdfs://node1:8020/user/hive/warehouse"). \config("hive.metastore.uris", "thrift://node1:9083"). \enableHiveSupport().\getOrCreate()sc = spark.sparkContext# 读取数据airdata = spark.read.table("airdata")# 需求一:城市平均AQI分析result1 = airdata.groupby("city")\.agg(mean("AQI").alias("avg_AQI"))\.orderBy("avg_AQI", ascending=False)# 需求二:六项污染物分析result2 = airdata.groupby("city") \.agg(mean("PM").alias("avg_PM"),mean("PM10").alias("avg_PM10"),mean("So2").alias("avg_So2"),mean("No2").alias("avg_No2"),mean("Co").alias("avg_Co"),mean("O3").alias("avg_O3"))# 需求三:年度空气质量趋势分析airdata = airdata.withColumn("date", airdata["date"].cast("date"))result3 = airdata.groupby("city", year("date").alias("year"), month("date").alias("month"))\.agg(max("AQI").alias("max_AQI"),min("AQI").alias("min_AQI"))# 需求四:月度PM污染物分析result4 = airdata.groupby("city", year("date").alias("year"), month("date").alias("month")) \.agg(avg("PM").alias("max_PM"),avg("PM10").alias("min_PM10"))# 需求五:优质空气天数统计result5 = airdata.groupby("city", year("date").alias("year"), month("date").alias("month"))\.agg(count(when(airdata["AQI"] < 50, True)).alias("greatAirCount"))# 需求六:污染物最大值分析result6 = airdata.groupby("city")\.agg(max("So2").alias("max_So2"),max("No2").alias("max_No2"))# 需求七:CO浓度分级统计airdata = airdata.withColumn("Co_category",when((col("Co") >= 0) & (col("Co") < 0.25), '0-0.25').when((col("Co") >= 0.25) & (col("Co") < 0.5), '0.25-0.5').when((col("Co") >= 0.5) & (col("Co") < 0.75), '0.5-0.75').when((col("Co") >= 0.75) & (col("Co") < 1.0), '0.75-1').otherwise("1以上"))result7 = airdata.groupby("Co_category").agg(count('*').alias('Co_count'))# 需求八:O3浓度分级统计airdata = airdata.withColumn("O3_category",when((col("O3") >= 0) & (col("O3") < 25), '0-25').when((col("O3") >= 0.25) & (col("O3") < 50), '25-50').when((col("O3") >= 50) & (col("O3") < 75), '50-75').when((col("O3") >= 75) & (col("O3") < 100), '75-100').otherwise("100以上"))result8 = airdata.groupby("O3_category").agg(count('*').alias('O3_count'))# 保存结果到MySQLresult1.write.mode("overwrite"). \format("jdbc"). \option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&charset=utf8"). \option("dbtable", "avgCityAqi"). \option("user", "root"). \option("password", "root"). \option("encoding", "utf-8"). \save()

3. 机器学习预测模块

预测模型设计与业务价值

1. 模型选择与理论基础

  • 线性回归模型:基于污染物浓度与AQI的线性关系,适合快速预测
  • 理论基础:AQI计算公式中各项污染物都有对应的权重系数
  • 业务价值:为环境监测和公众健康提供实时预测服务
  • 应用场景:空气质量预警、出行建议、健康防护指导

2. 特征工程与数据预处理

  • 特征选择:PM2.5、SO2、NO2、O3作为主要预测特征
  • 数据清洗:处理缺失值、异常值和数据标准化
  • 特征重要性:通过模型分析各污染物对AQI的贡献度
  • 数据质量:确保训练数据的准确性和完整性

3. 模型评估与优化

  • 评估指标:R²、MAE、RMSE等指标评估模型性能
  • 交叉验证:使用K折交叉验证确保模型泛化能力
  • 超参数调优:通过网格搜索优化模型参数
  • 模型监控:实时监控模型性能,及时更新模型
技术实现细节

1. 数据预处理与特征工程

import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.linear_model import LinearRegression, Ridge, Lasso
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.ensemble import RandomForestRegressor
import joblibclass AirQualityPredictor:def __init__(self):self.scaler = StandardScaler()self.model = Noneself.feature_names = ['PM2_5', 'SO2', 'NO2', 'O3']def preprocess_data(self, data):"""数据预处理和特征工程"""# 1. 数据清洗data = data.dropna(subset=['AQI', 'PM2_5', 'SO2', 'NO2', 'O3'])# 2. 异常值处理data = data[(data['AQI'] >= 0) & (data['AQI'] <= 500)]data = data[(data['PM2_5'] >= 0) & (data['PM2_5'] <= 500)]data = data[(data['SO2'] >= 0) & (data['SO2'] <= 1000)]data = data[(data['NO2'] >= 0) & (data['NO2'] <= 400)]data = data[(data['O3'] >= 0) & (data['O3'] <= 300)]# 3. 特征工程# 添加时间特征data['date'] = pd.to_datetime(data['date'])data['year'] = data['date'].dt.yeardata['month'] = data['date'].dt.monthdata['day'] = data['date'].dt.daydata['day_of_week'] = data['date'].dt.dayofweek# 添加空气质量等级特征level_map = {'优': 1, '良': 2, '轻度污染': 3, '中度污染': 4, '重度污染': 5, '严重污染': 6}data['level_numeric'] = data['level'].map(level_map).fillna(0)# 添加污染物交互特征data['PM_SO2_ratio'] = data['PM2_5'] / (data['SO2'] + 1)data['NO2_O3_ratio'] = data['NO2'] / (data['O3'] + 1)return datadef feature_selection(self, data):"""特征选择"""# 基础特征base_features = ['PM2_5', 'SO2', 'NO2', 'O3']# 时间特征time_features = ['year', 'month', 'day', 'day_of_week']# 交互特征interaction_features = ['PM_SO2_ratio', 'NO2_O3_ratio']# 组合所有特征all_features = base_features + time_features + interaction_featuresreturn data[all_features], all_features

2. 模型训练与优化

def train_model(self, X_train, y_train):"""模型训练与优化"""# 1. 数据标准化X_train_scaled = self.scaler.fit_transform(X_train)# 2. 基础线性回归lr_model = LinearRegression()lr_model.fit(X_train_scaled, y_train)# 3. 正则化模型ridge_model = Ridge(alpha=1.0)lasso_model = Lasso(alpha=0.1)# 4. 随机森林模型rf_model = RandomForestRegressor(n_estimators=100, random_state=42)# 5. 模型评估models = {'LinearRegression': lr_model,'Ridge': ridge_model,'Lasso': lasso_model,'RandomForest': rf_model}best_model = Nonebest_score = -float('inf')for name, model in models.items():# 交叉验证cv_scores = cross_val_score(model, X_train_scaled, y_train, cv=5, scoring='r2')avg_score = cv_scores.mean()print(f"{name}: CV R² Score = {avg_score:.4f} (+/- {cv_scores.std() * 2:.4f})")if avg_score > best_score:best_score = avg_scorebest_model = model# 训练最佳模型best_model.fit(X_train_scaled, y_train)self.model = best_modelreturn best_model, best_scoredef hyperparameter_tuning(self, X_train, y_train):"""超参数调优"""# 定义参数网格param_grid = {'alpha': [0.001, 0.01, 0.1, 1.0, 10.0],'max_iter': [1000, 2000, 3000]}# 网格搜索grid_search = GridSearchCV(Ridge(),param_grid,cv=5,scoring='r2',n_jobs=-1)X_train_scaled = self.scaler.transform(X_train)grid_search.fit(X_train_scaled, y_train)print(f"最佳参数: {grid_search.best_params_}")print(f"最佳得分: {grid_search.best_score_:.4f}")return grid_search.best_estimator_

3. 模型评估与预测

def evaluate_model(self, X_test, y_test):"""模型评估"""X_test_scaled = self.scaler.transform(X_test)y_pred = self.model.predict(X_test_scaled)# 计算评估指标mse = mean_squared_error(y_test, y_pred)mae = mean_absolute_error(y_test, y_pred)r2 = r2_score(y_test, y_pred)rmse = np.sqrt(mse)print(f"模型评估结果:")print(f"R² Score: {r2:.4f}")print(f"MAE: {mae:.4f}")print(f"RMSE: {rmse:.4f}")print(f"MSE: {mse:.4f}")# 特征重要性分析if hasattr(self.model, 'feature_importances_'):feature_importance = pd.DataFrame({'feature': self.feature_names,'importance': self.model.feature_importances_}).sort_values('importance', ascending=False)print(f"\n特征重要性:")print(feature_importance)return {'r2': r2,'mae': mae,'rmse': rmse,'mse': mse,'predictions': y_pred}def predict_aqi(self, pm25, so2, no2, o3):"""实时AQI预测"""# 构建输入数据input_data = np.array([[pm25, so2, no2, o3]])# 数据标准化input_scaled = self.scaler.transform(input_data)# 预测prediction = self.model.predict(input_scaled)[0]# 预测结果解释aqi_level = self.get_aqi_level(prediction)return {'predicted_aqi': round(prediction, 2),'aqi_level': aqi_level,'confidence': self.get_prediction_confidence(input_scaled)}def get_aqi_level(self, aqi):"""获取AQI等级"""if aqi <= 50:return "优"elif aqi <= 100:return "良"elif aqi <= 150:return "轻度污染"elif aqi <= 200:return "中度污染"elif aqi <= 300:return "重度污染"else:return "严重污染"def get_prediction_confidence(self, input_data):"""获取预测置信度"""# 基于模型的不确定性估计if hasattr(self.model, 'predict_proba'):confidence = np.max(self.model.predict_proba(input_data))else:# 对于回归模型,使用预测值的合理性作为置信度prediction = self.model.predict(input_data)[0]confidence = max(0, min(1, 1 - abs(prediction - 100) / 100))return round(confidence, 3)

4. 模型持久化与部署

def save_model(self, model_path):"""保存模型"""model_data = {'model': self.model,'scaler': self.scaler,'feature_names': self.feature_names}joblib.dump(model_data, model_path)print(f"模型已保存到: {model_path}")def load_model(self, model_path):"""加载模型"""model_data = joblib.load(model_path)self.model = model_data['model']self.scaler = model_data['scaler']self.feature_names = model_data['feature_names']print(f"模型已从 {model_path} 加载")def batch_predict(self, data_batch):"""批量预测"""predictions = []for data in data_batch:pred = self.predict_aqi(data['PM2_5'], data['SO2'], data['NO2'], data['O3'])predictions.append(pred)return predictions

模型实现

import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from utils.query import querysdef getData():# 从数据库获取数据data = querys('select * from airData', [], 'select')print("数据库返回的列数:", len(data[0]) if data else 0)print("第一行数据:", data[0] if data else None)# 转换为DataFrame,根据实际数据调整列名products = pd.DataFrame(data, columns=['city', 'date', 'level', 'AQI', 'PM2_5', 'PM10', 'SO2', 'NO2', 'CO', 'O3', 'predict', 'extra'])# 处理空气质量等级,将文本转换为数值level_map = {'优': 1, '良': 2, '轻度污染': 3, '中度污染': 4, '重度污染': 5, '严重污染': 6}products['level'] = products['level'].map(level_map).fillna(0).astype('int')# 确保数值列为数值类型numeric_columns = ['AQI', 'PM2_5', 'PM10', 'SO2', 'NO2', 'CO', 'O3']for col in numeric_columns:products[col] = pd.to_numeric(products[col], errors='coerce').fillna(0).astype('float')return productsdef model_train(data):# 特征选择:PM2.5、SO2、NO2、O3x_train, x_test, y_train, y_test = train_test_split(data[["PM2_5", "SO2", "NO2", "O3"]], data['AQI'],test_size=0.25, random_state=1)model = LinearRegression()model.fit(x_train, y_train)return modeldef model_predict(model, data):return model.predict(data)def pred(model, *args):# 将输入参数转换为numpy数组data = np.array([args]).reshape(1, -1)pred = model.predict(data)return predif __name__ == '__main__':trainData = getData()model = model_train(trainData)print(pred(model, 10, 2, 18, 30))

4. 数据可视化模块

图表类型

  • 年度空气质量分析图表:展示年度AQI变化趋势和季节性规律
  • 月度分析图表:按月展示空气质量变化,识别污染高峰期
  • 气体分析图表:六项污染物的对比分析,展示污染物相关性
  • 城市分布图表:各城市空气质量对比,识别区域差异
  • 数据词云图:空气质量相关词汇的可视化展示

图表数据处理

def getIndexData(defaultCity):avgCityAqiList = list(getavgCityAqi())avgCitySixList = list(getavgCitySix())realSixList = querys('select * from avgCitySix where city = %s', [defaultCity], 'select')[0]yLine = list(realSixList[1:])yLine = [round(x, 1) for x in yLine]xLine = ['PM2.5', 'PM10', 'So2', 'No2', 'Co', 'O3']xBar = [x[0] for x in avgCityAqiList]yBar = [round(x[1], 0) for x in avgCityAqiList]return xBar, yBar, xLine, yLinedef getYearChartData(city):maxCityAqiList = querys('select * from maxCityAqi where city = %s', [city], 'select')y1DataM = [x[3] if x[3] is not None else 0 for x in maxCityAqiList]y1DataN = [x[4] if x[4] is not None else 0 for x in maxCityAqiList]xData = []for i in range(1, 13):xData.append(str(i) + '月')avgCityPMList = querys('select * from avgCityPM where city = %s', [city], 'select')y2Data2 = [round(x[3], 1) if x[3] is not None else 0 for x in avgCityPMList]y2Data10 = [round(x[4], 1) if x[4] is not None else 0 for x in avgCityPMList]return xData, y1DataM, y1DataN, y2Data2, y2Data10def getAirMonthData(city, month):airDataList = querys('select * from airData where city = %s and SUBSTRING(date,6,2) = %s', [city, month], 'select')# 按日期排序,确保日期格式正确airDataList = sorted(airDataList, key=lambda x: int(x[1].split('-')[2]))y1Data = [x[3] if x[3] is not None else 0 for x in airDataList]y2Data = [int(x[4]) if x[4] is not None else 0 for x in airDataList]dateList = [x[1] for x in airDataList]xData = []for date in dateList:year, month, day = date.split('-')xData.append(day)greatAirList = querys('select * from greatAir where city = %s', [city], 'select')funnelData = []for i in greatAirList:funnelData.append({'name': str(i[2]) + '月','value': i[3] if i[3] is not None else 0})return xData, y1Data, y2Data, funnelData

ECharts实现示例

// 年度AQI趋势图
var option = {title: { text: '年度空气质量趋势分析' },tooltip: { trigger: 'axis' },legend: {},toolbox: {show: true,feature: {dataZoom: { yAxisIndex: 'none' },dataView: { readOnly: false },magicType: { type: ['line', 'bar'] },restore: {},saveAsImage: {}}},xAxis: { type: 'category', boundaryGap: false,data: {{ xData | safe }}},yAxis: { type: 'value',axisLabel: { formatter: '{value} ' }},series: [{name: 'So2',type: 'line',data: {{ y1Data }},markPoint: {data: [{ type: 'max', name: 'Max' },{ type: 'min', name: 'Min' }]},markLine: {data: [{ type: 'average', name: 'Avg' }]}},{name: 'No2',type: 'line',data: {{ y2Data }},markPoint: {data: [{ name: '周最低', value: -2, xAxis: 1, yAxis: -1.5 }]},markLine: {data: [{ type: 'average', name: 'Avg' },[{symbol: 'none',x: '90%',yAxis: 'max'},{symbol: 'circle',label: {position: 'start',formatter: 'Max'},type: 'max',name: '最高点'}]]}}]
};

5. Web用户界面模块

界面功能

  • 用户注册登录系统
  • 个人信息管理
  • 数据总览表格展示
  • 交互式日期选择器
  • 响应式设计,支持多设备访问

数据模型设计

from django.db import modelsclass User(models.Model):id = models.AutoField("id", primary_key=True)username = models.CharField("用户名", max_length=255, default='')password = models.CharField("密码", max_length=255, default='')creteTime = models.DateField("创建时间", auto_now_add=True)class Meta:db_table = 'user'

数据库查询工具

from pyhive import hive
import time
from thrift.Thrift import TApplicationExceptiondef get_connection():try:return hive.Connection(host='node1', port='10000', username='hadoop')except Exception as e:print(f"连接Hive失败: {str(e)}")return Nonedef querys(sql, params, type='no_select', max_retries=3):retry_count = 0while retry_count < max_retries:try:conn = get_connection()if not conn:raise Exception("无法建立Hive连接")cursor = conn.cursor()params = tuple(params)cursor.execute(sql, params)if type != 'no_select':data_list = cursor.fetchall()conn.commit()return data_listelse:conn.commit()return '数据库语句执行成功'except TApplicationException as e:print(f"查询执行失败 (尝试 {retry_count + 1}/{max_retries}): {str(e)}")retry_count += 1if retry_count == max_retries:raise Exception(f"查询执行失败,已重试{max_retries}次: {str(e)}")time.sleep(2)  # 等待2秒后重试except Exception as e:print(f"发生错误: {str(e)}")raisefinally:try:if 'cursor' in locals():cursor.close()if 'conn' in locals():conn.close()except:pass

Django视图实现

def index(request):uname = request.session.get('username')userInfo = User.objects.get(username=uname)airdataList = list(getairdata())dateList = list(set([x[1] for x in airdataList]))# 提取年月日数据yearList = []monthList = []dayList = []for date in dateList:year, month, day = date.split('-')yearList.append(year)monthList.append(month)dayList.append(day)yearList = sorted(set(yearList))monthList = sorted(set(monthList))dayList = sorted(set(dayList))defaultYear = '2023'defaultMonth = '01'defaultDay = '01'cityList = list(set(x[0] for x in airdataList))if request.method == 'POST':defaultYear = request.POST.get('defaultYear')defaultMonth = request.POST.get('defaultMonth')defaultDay = request.POST.get('defaultDay')defaultCity = request.session.get('defaultCity')currentDate = defaultYear + '-' + defaultMonth + '-' + defaultDay# 获取图表数据xBar, yBar, xLine, yLine = getIndexData(defaultCity)currentData = querys('select * from airdata where city = %s and date = %s',[defaultCity, currentDate], 'select')[0]currentData = convert_none_to_null(currentData)return render(request, 'index.html', {'userInfo': userInfo,'yearList': yearList,'monthList': monthList,'dayList': dayList,'defaultYear': defaultYear,'defaultMonth': defaultMonth,'defaultDay': defaultDay,'cityList': cityList,'defaultCity': defaultCity,'currentData': currentData,'xBar': convert_none_to_null(xBar),'yBar': convert_none_to_null(yBar),'xLine': convert_none_to_null(xLine),'yLine': convert_none_to_null(yLine)})

系统特色功能

1. 分布式数据处理

  • 利用Spark的分布式计算能力,处理大规模空气质量数据
  • 支持实时数据流处理和批量数据分析
  • 实现数据并行计算,提高处理效率

2. 智能预测分析

  • 基于历史数据训练机器学习模型
  • 支持多维度特征工程和模型优化
  • 提供预测结果的可信度评估

3. 多维度数据可视化

  • 支持多种图表类型:折线图、柱状图、散点图、热力图等
  • 实现交互式数据探索和动态筛选
  • 提供图表导出和分享功能

4. 实时数据更新

  • 定时任务自动更新空气质量数据
  • 支持增量数据同步和全量数据更新
  • 确保数据的时效性和准确性

项目部署与运维

环境配置

# 安装Python依赖
pip install -r requirements.txt# 配置数据库
mysql -u root -p < design_91_airdata.sql# 启动Spark集群
spark-submit --master local[*] sparks/sparkAna.py# 启动Django应用
python manage.py runserver

系统要求

  • Python 3.8+
  • Apache Spark 3.0+
  • MySQL 8.0+
  • 内存:8GB+
  • 存储:100GB+

性能优化

1. 数据处理优化

  • 使用Spark的缓存机制减少重复计算
  • 优化SQL查询,使用索引提高查询效率
  • 实现数据分区,提高并行处理能力

2. 前端性能优化

  • 使用CDN加速静态资源加载
  • 实现图表懒加载和分页显示
  • 优化JavaScript代码,减少页面加载时间

3. 数据库优化

  • 建立合适的索引提高查询速度
  • 使用连接池管理数据库连接
  • 定期清理历史数据,保持数据库性能

项目成果

1. 技术成果

  • 成功构建了完整的大数据分析和可视化系统
  • 实现了从数据采集到预测分析的完整流程
  • 建立了可扩展的系统架构,支持功能扩展

2. 业务价值

  • 为空气质量监测提供了科学的数据分析工具
  • 支持多城市空气质量对比和趋势分析
  • 为环境决策提供了数据支撑

3. 用户体验

  • 提供了直观友好的用户界面
  • 支持多维度数据探索和可视化
  • 实现了响应式设计,适配多种设备

未来展望

1. 功能扩展

  • 增加更多城市的空气质量数据
  • 支持更多污染物指标的监测
  • 集成更多机器学习算法

2. 技术升级

  • 升级到Spark 3.x版本,利用新特性
  • 引入深度学习模型提高预测精度
  • 实现实时数据流处理

3. 用户体验优化

  • 开发移动端APP
  • 增加个性化推荐功能
  • 提供API接口供第三方调用

总结

本项目成功构建了一个基于Spark的空气质量数据分析可视化系统,实现了从数据采集、存储、分析、预测到可视化的完整数据科学流程。系统采用现代化的技术栈,具备良好的可扩展性和维护性,为空气质量监测和分析提供了强有力的技术支撑。

通过本项目的开发,不仅掌握了大数据处理、机器学习、Web开发等多项技术,更重要的是理解了如何将复杂的技术栈整合成一个完整的业务系统,为用户提供有价值的服务。


联系方式:如需源码等情况,欢迎主页联系

许可证:MIT License


http://www.lryc.cn/news/604594.html

相关文章:

  • neovim 怎么调用 clang-format进行格式化
  • 常⻅CMS漏洞
  • 《Flutter篇第二章》MasonryGridView瀑布流列表
  • 算法能力提升之快速矩阵
  • python反爬:一文掌握 undetected-chromedriver 的详细使用(可通过机器人验证)
  • Flutter封装模板及最佳实践
  • git本地仓库,工作区和暂存区的知识
  • 操作系统- lecture3(进程的定义)
  • RAG:检索增强生成的范式演进、技术突破与前沿挑战
  • 通义万相文生图模型wan2.2-t2i-flash和wan2.2-t2i-plus全维度深度对比
  • 通达OA服务器无公网IP网络,如何通过内网穿透实现外网远程办公访问OA系统
  • FIN1531 LVDS输出
  • SpringBoot升级2.5.3 2.6.8
  • Vue3 Composition API
  • 【LeetCode 热题 100】33. 搜索旋转排序数组——(解法二)一次二分
  • Kong API Gateway的十年进化史
  • Zookeeper符合cap中的AP还是CP
  • FPGA(或者数字电路)中组合逻辑和时序逻辑是怎么划分的
  • 域名https证书
  • 全栈(day1)
  • springboot本地访问https链接,证书错误
  • python基础语法1,python语法元素(简单易上手的python语法教学)(课后习题)
  • 深度学习(鱼书)day06--神经网络的学习(后两节)
  • 【自动化运维神器Ansible】Ansible常用模块之user模块详解
  • css初学者第二天
  • 认识RobotStudio的软件界面
  • Q2流动式起重机司机证理论考试真题
  • solidity 中 Eth 和 Usd 到底如何转换
  • 关于项目的一些完善功能
  • AD里面出现元器件PCB封装不能编辑的情况