文件下载时利用redis的队列模式顺序下载文件,防止多文件任务下载导致OOM
1、controller层控制
@Resourceprivate RedissonClient redissonClient;@Slf4j
@Service
public class CustomerSettlementExportServiceImpl implements ICustomerSettlementExportService {
/*** 文件加入队列顺序导出** @param pubFileExportList 参数* @return 结果*/public AjaxResult pubFileExport(List<PubFileExport> pubFileExportList) {if (CollectionUtils.isEmpty(pubFileExportList)) {return AjaxResult.error("客户信息不能为空!");}RQueue<String> queue = redissonClient.getQueue("downloadQueue:file:export");for (PubFileExport pubFileExport : pubFileExportList) {try {queue.add(JSONObject.toJSONString(pubFileExport));} catch (Exception e) {log.error("加入队列失败=", e);return AjaxResult.error("添加导出任务到队列失败!");}}return AjaxResult.success();}
}
以下代码是通过定时任务拉取队列进行文件下载
@Slf4j
@Service
public class FileDownloadServiceImpl implements IFileDownloadService {@Resourceprivate ICustomerSettlementExportService customerSettlementExportService;@Resourceprivate IPubFileExportService pubFileExportService;@Resourceprivate RedisTemplate<String, String> redisTemplate;@Resourceprivate RedissonClient redissonClient;@Scheduled(fixedRate = 10000)public void processDownloadsInit() {// 提前过滤掉特定 IP 地址if (isLocalIp()) {return;}RLock lock = null;try {lock = redissonClient.getLock(BmsConstants.QUEUE_KEY_LOCK);boolean isOrderLock = lock.tryLock(10, TimeUnit.SECONDS);if (!isOrderLock) {// 当前任务未执行完,不能获取锁log.error("未能获取分布式锁,当前任务未执行完或有其他实例正在执行");return;}// 阻塞获取队列中的任务String exportTask = redisTemplate.opsForList().rightPop(BmsConstants.QUEUE_KEY, 10, TimeUnit.SECONDS);if (exportTask != null) {log.info("队列文件下载开始=" + DateUtils.getTime());handleDownload(exportTask);log.info("队列文件下载结束=" + DateUtils.getTime());}} catch (Exception e) {log.error("队列文件下载异常=", e);} finally {safeUnlock(lock);}}/*** 执行队列文件下载** @param exportTask 导出参数*/private void handleDownload(String exportTask) {try {PubFileExport pubFileExport = JSONObject.parseObject(exportTask, PubFileExport.class);// 实际的文件下载逻辑if (pubFileExport.getExportStartTime() == null|| pubFileExport.getExportEndTime() == null|| StringUtils.isBlank(pubFileExport.getClientCode())) {log.error("客户数据文件下载的参数为空请检查!");return;}AjaxResult ajaxResult = customerSettlementExportService.exportSheets(pubFileExport);pubFileExportService.updateExportAllExcel(ajaxResult, pubFileExport.getGid(), pubFileExport.getFinalUrl());} catch (Exception e) {log.error("执行队列文件下载异常=", e);}}/*** 检查是否为本地 IP 地址** @return 是否为本地 IP 地址*/private boolean isLocalIp() {try {InetAddress localAddress = InetAddress.getLocalHost();String ip = localAddress.getHostAddress();return ip.startsWith("192.168");} catch (Exception e) {log.error("获取本地 IP 地址异常=", e);return false;}}/*** 安全释放锁** @param lock 锁对象*/private void safeUnlock(RLock lock) {if (lock != null && lock.isLocked() && lock.isHeldByCurrentThread()) {try {lock.unlock();} catch (Exception e) {log.error("释放锁异常=", e);}}}}
3、解析下代码
这段代码实现了文件下载任务的队列管理和处理。
主要功能包括:
任务入队:将多个文件导出任务加入Redis队列。
定时任务处理:每隔10秒从队列中取出任务并执行文件下载。
分布式锁控制:确保同一时间只有一个实例处理任务,避免重复执行。
本地IP过滤:防止本地IP地址触发任务。
控制流图(CFG)
flowchart TD
A[开始] --> B{是否为本地IP}
B -->|是| E[结束]
B -->|否| C[获取分布式锁]
C --> D{是否获取成功}
D -->|否| F[记录日志并结束]
D -->|是| G[从队列取任务]
G --> H{是否有任务}
H -->|否| I[释放锁并结束]
H -->|是| J[处理下载任务]
J --> K[记录开始时间]
K --> L[调用下载逻辑]
L --> M[记录结束时间]
M --> N[释放锁并结束]
详细说明
任务入队 (pubFileExport 方法):
检查任务列表是否为空。
将每个任务序列化后加入Redis队列。
定时任务处理 (processDownloadsInit 方法):
检查是否为本地IP地址,如果是则直接返回。
尝试获取分布式锁,如果失败则记录日志并返回。
从Redis队列中取出任务,如果有任务则调用处理方法。
记录任务开始和结束时间,并释放锁。
处理下载任务 (handleDownload 方法):
反序列化任务参数。
检查参数是否完整,如果不完整则记录错误并返回。
调用客户结算导出服务进行文件下载,并更新导出状态。
辅助方法:
isLocalIp:检查当前IP是否为本地IP。
safeUnlock:安全释放分布式锁。