一次性接收大量上传图片,后端优化方式
文章目录
- 1. 分块接收与流式处理
- 2. 异步处理
- 3. 内存映射与临时文件
- 4. 数据库优化
- 5. 缓存策略
- 6. 压缩与格式优化
- 7. 限流与并发控制
- 8. 分布式存储
- 9. 响应优化
- 10. 监控与错误处理
- 11. 数据库连接池优化
1. 分块接收与流式处理
使用流式处理避免将所有图片加载到内存中:
from flask import Flask, request
import osapp = Flask(__name__)@app.route('/upload', methods=['POST'])
def upload_images():uploaded_files = request.files.getlist("images")# 流式处理,避免一次性加载所有文件到内存for file in uploaded_files:if file and allowed_file(file.filename):filename = secure_filename(file.filename)# 直接保存到磁盘,不加载到内存file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))return {'status': 'success', 'count': len(uploaded_files)}
2. 异步处理
使用异步任务队列处理耗时操作:
from celery import Celery
from flask import Flask, requestapp = Flask(__name__)
celery = Celery(app.name, broker='redis://localhost:6379')@celery.task
def process_images_task(file_paths):# 在后台处理图片(压缩、格式转换等)results = []for file_path in file_paths:# 处理逻辑result = process_single_image(file_path)results.append(result)return results@app.route('/upload', methods=['POST'])
def upload_images():file_paths = []for file in request.files.getlist("images"):filename = secure_filename(file.filename)file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)file.save(file_path)file_paths.append(file_path)# 异步处理图片task = process_images_task.delay(file_paths)return {'status': 'success', 'task_id': task.id}
3. 内存映射与临时文件
使用内存映射和临时文件减少内存占用:
import tempfile
import mmapdef process_large_image(file):# 创建临时文件而不是加载到内存with tempfile.NamedTemporaryFile(delete=False) as tmp_file:file.save(tmp_file.name)# 使用内存映射处理大文件with open(tmp_file.name, 'r+b') as f:with mmap.mmap(f.fileno(), 0) as mmapped_file:# 处理映射的文件内容process_mapped_data(mmapped_file)# 清理临时文件os.unlink(tmp_file.name)
4. 数据库优化
批量插入和连接池管理:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker# 创建连接池
engine = create_engine('postgresql://user:password@localhost/db',pool_size=20,max_overflow=30
)
Session = sessionmaker(bind=engine)def batch_insert_image_records(image_data_list):session = Session()try:# 批量插入session.bulk_insert_mappings(ImageModel, image_data_list)session.commit()except Exception as e:session.rollback()raise efinally:session.close()
5. 缓存策略
使用Redis等缓存减少重复处理:
import redis
import hashlib
import jsonredis_client = redis.Redis(host='localhost', port=6379, db=0)def get_cached_result(file_hash):cached = redis_client.get(f"image_result:{file_hash}")return json.loads(cached) if cached else Nonedef cache_result(file_hash, result):redis_client.setex(f"image_result:{file_hash}",3600, # 1小时过期json.dumps(result))def process_image_with_cache(file):file_content = file.read()file_hash = hashlib.md5(file_content).hexdigest()# 检查缓存cached_result = get_cached_result(file_hash)if cached_result:return cached_result# 处理图片result = process_image_logic(file_content)# 缓存结果cache_result(file_hash, result)return result
6. 压缩与格式优化
在服务器端进一步优化图片:
from PIL import Image
import iodef optimize_image(file, max_size=(1920, 1080), quality=85):image = Image.open(file)# 调整尺寸image.thumbnail(max_size, Image.LANCZOS)# 优化并保存output = io.BytesIO()image.save(output, format='JPEG', quality=quality, optimize=True)output.seek(0)return output
7. 限流与并发控制
控制并发请求数量:
from flask_limiter import Limiter
from flask_limiter.util import get_remote_addresslimiter = Limiter(app,key_func=get_remote_address,default_limits=["100 per hour"]
)@app.route('/upload', methods=['POST'])
@limiter.limit("10 per minute")
def upload_images():# 上传处理逻辑pass
8. 分布式存储
使用分布式文件系统存储大量图片:
import boto3
from botocore.exceptions import ClientErrors3_client = boto3.client('s3')def upload_to_s3(file, bucket, key):try:s3_client.upload_fileobj(file, bucket, key)return f"https://{bucket}.s3.amazonaws.com/{key}"except ClientError as e:print(f"Error uploading to S3: {e}")return Nonedef batch_upload_to_s3(files, bucket):urls = []for file in files:key = f"images/{secure_filename(file.filename)}"url = upload_to_s3(file, bucket, key)if url:urls.append(url)return urls
9. 响应优化
使用流式响应和压缩:
from flask import Response
import json@app.route('/upload', methods=['POST'])
def upload_images_stream():def generate():yield '{"status": "processing", "files": ['files = request.files.getlist("images")for i, file in enumerate(files):# 处理每个文件result = process_file(file)yield json.dumps(result)if i < len(files) - 1:yield ","yield ']}'return Response(generate(), mimetype='application/json')
10. 监控与错误处理
集成监控和错误处理机制:
import logging
from prometheus_client import Counter, Histogram# 定义监控指标
upload_counter = Counter('image_uploads_total', 'Total image uploads')
upload_duration = Histogram('image_upload_duration_seconds', 'Image upload duration')@app.route('/upload', methods=['POST'])
@upload_duration.time()
def upload_images():try:files = request.files.getlist("images")upload_counter.inc(len(files))# 处理逻辑results = process_files(files)return {'status': 'success', 'count': len(results)}except Exception as e:logging.error(f"Upload error: {e}")return {'status': 'error', 'message': str(e)}, 500
11. 数据库连接池优化
优化数据库连接池配置:
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePoolengine = create_engine('postgresql://user:password@localhost/db',poolclass=QueuePool,pool_size=20, # 连接池大小max_overflow=30, # 超出pool_size后最多可创建的连接数pool_recycle=3600, # 连接回收时间(秒)pool_pre_ping=True, # 检查连接有效性pool_timeout=30 # 获取连接超时时间
)