当前位置: 首页 > news >正文

一种基于springboot、redis的分布式任务引擎的实现(一)

 总体思路是,主节点接收到任务请求,将根据任务情况拆分成多个任务块,将任务块标识的主键放入redis。发送redis消息,等待其他节点运行完毕,结束处理。接收到信息的节点注册本节点信息到redis、开启多线程、获取任务块、执行任务、结束处理。

1、主节点接收任务请求

    @Overridepublic void executeTaskInfo(PrepareDTO prepareDTO) {//异常标记String taskInfo = prepareDTO.getTaskId();//任务分组状态String taskStatus = "";try {log.info("数据准备任务并设定任务执行状态,{}", prepareDTO);this.dataPrepareBo.doStartGroupJobInfo(prepareDTO);//给redis集合中放计算对象log.info("开始放入计算任务:{}", prepareDTO);boolean getTaskFlag = this.dataPrepareBo.pushCalculationObject(prepareDTO);if (!getTaskFlag) {taskStatus = String.format("没有获取数据或计划已取消,%s", taskInfo);log.error(taskStatus);throw new Exception(taskStatus);}//发消息执行缓存中任务log.info("发消息执行任务:{}", prepareDTO);sendMessage(prepareDTO);//等待任务执行完毕log.info("等待任务执行结果");taskStatus = this.getGroupUpLoadTaskFinsh(prepareDTO);} catch (Exception e) {//捕获日志e.printStackTrace();taskStatus = "获取任务状态异常" + e;log.info(taskStatus);dataPrepareBo.putExceptionMsg2Cache(taskInfo, "数据准备分发计算任务线程异常:" + taskStatus);} finally {//做任务结束处理this.doGroupTaskFinshpPocess(prepareDTO, taskStatus);}}

2,发送消息

    @Overridepublic void sendMessage(String topic, String msg) {this.redisTemplate.convertAndSend(topic, msg);}

3,节点接收任务,并执行

    public void doUpLoadTask(String msg) throws Exception {log.info("开始执行明细任务{}" + msg);String taskId = this.getTaskId(msg);try {Object cancelFlag = this.redisTemplate.opsForValue().get(String.format(EngineConstant.JOB_CANCEL_FLAG, taskId));if(cancelFlag != null && "1".equals(cancelFlag.toString())){log.info("本次任务已取消");return;}//上传本机执行信息到redisthis.cacheBo.initGroupUpLoadTaskStats(taskId,ENGINE_DISTRIBUTION_RUNNING.getKey());//从缓存获取任务,获取任务后启线程执行任务。如果没获取到任务,则本节点任务执行完毕//循环获取任务this.groupTaskProcessBO.doGroupTaskProcess(taskId, null);//处理结束this.cacheBo.finishGroupUpLoadTaskStats(taskId,ENGINE_DISTRIBUTION_RUNNING.getKey());} catch (Exception e) {//记录日志taskUpldExeLogCDTO.setRunStas("-1");String exceptionInfo = this.taskLogUtils.getExceptionInfo(e) ;taskUpldExeLogCDTO.setAbnInfo(exceptionInfo);throw e;} finally {//记录日志taskUpldExeLogCDTO.setEndtime(DateUtil.getCurrentDate());if("-1".equals(taskUpldExeLogCDTO.getRunStas())){//异常结束this.taskLogUtils.sendLogInfo(taskUpldExeLogCDTO,"执行上传任务异常");} else {//正常结束taskUpldExeLogCDTO.setRunStas("1");this.taskLogUtils.sendLogInfo(taskUpldExeLogCDTO,"执行上传任务正常");}}}

4,开启线程执行任务

    @Overridepublic CalculationDTO doGroupTaskProcess(String taskId, TaskUpldExeLogCDTO taskUpldExeLogCDTO) throws Exception {List<Future> futureList = new ArrayList<>();//开始执行明细任务处理ThreadPoolTaskExecutor taskTransferExecutor = ToolUtil.getExecutor("engine-file-tasks-pool-", Math.min(parallelProcessNum,10), 8);ExecutorListHolder.putThreadPool(String.format(GroupConstant.PREPARE_ENGINE_POOL,taskId), taskTransferExecutor.getThreadPoolExecutor());for(int i = 0 ; i < parallelProcessNum ; i++) {DoGroupUpLoadTaskThread doGroupUpLoadTaskThread = new DoGroupUpLoadTaskThread(taskId, redisTemplate, calculationBo, null, null);Future<?> future = taskTransferExecutor.submit(doGroupUpLoadTaskThread);futureList.add(future);}if (!CollectionUtil.isEmpty(futureList)) {futureList.forEach(f -> {try {f.get(GroupTaskProcessBOImpl.maxTime, TimeUnit.SECONDS);} catch (Exception e) {e.printStackTrace();}});}log.info("本节点执行分组任务执行完毕{}", taskId + ":" + GroupConstant.IDENTITY);return null;}

5,线程执行明细

    @Overridepublic ResponseDTO call() throws Exception {//执行任务while(true) {FilterTableUniqueDTO filterTableUniqueDTO = (FilterTableUniqueDTO)this.redisTemplate.opsForList().leftPop(String.format(ENGINE_FILTERTABLEUNIQUE_QUEUE.getKey(), taskId));log.debug("取出任务:" + filterTableUniqueDTO);if(null == filterTableUniqueDTO) {break ;}long lastNum = this.redisTemplate.opsForList().size(String.format(ENGINE_FILTERTABLEUNIQUE_QUEUE.getKey(), taskId));log.info("生成文件剩余任务数量:" + lastNum);
//           处理任务calculationBo.GenerateFile(filterTableUniqueDTO, taskUpldDetlLogCDTO);}return null;}

以上是主要入口总体思路涉及代码,详细实现整理起来涉及内容比较繁多,将在第二部分分享。

http://www.lryc.cn/news/130231.html

相关文章:

  • 基于IDE Eval Resetter延长IntelliJ IDEA等软件试用期的方法(包含新版本软件的操作方法)
  • RocketMQ消费者可以手动消费但无法主动消费问题,或生成者发送超时
  • 【数据库系统】--【2】DBMS架构
  • 第三章 图论 No.13拓扑排序
  • 喜报 | 擎创再度入围IDC中国FinTech 50榜单
  • 【C++ 记忆站】引用
  • Hlang--用Python写个编程语言-变量的实现
  • 多维时序 | MATLAB实现PSO-CNN-BiLSTM多变量时间序列预测
  • 实现Java异步调用的高效方法
  • 批量提取文件名到excel,详细的提取步骤
  • C#中的泛型约束可以用在以下几个地方?
  • Linux Vm上部署Docker
  • ubuntu bind dns服务配置
  • 安卓的代码加固和其他安全问题
  • 关于Linux Docker springboot jar 日志时间不正确 问题解决
  • 提高批量爬虫工作效率
  • E96系列电阻阻值和代码、乘数对照表
  • 基于CentOS7.9安装部署docker(简洁版)
  • MySQL常用练手题目
  • Oracle字段长度不足位数补零
  • <数据结构与算法>二叉树堆的实现
  • FPGA:RS编码仿真过程
  • RocketMQ 5.0 架构解析:如何基于云原生架构支撑多元化场景
  • Android su
  • 微信小程序真机调试异常cmdId 1006, errCode-50011-已解决
  • 36.SpringMVC视图
  • LeetCode 热题 100(四):48. 旋转图像、240. 搜索二维矩阵 II、234. 回文链表
  • Qt 编译使用Bit7z库接口调用7z.dll、7-Zip.dll解压压缩常用Zip、ISO9660、Wim、Esd、7z等格式文件(二)
  • 224、仿真-基于51单片机音乐播放器流水灯控制Proteus仿真设计(程序+Proteus仿真+原理图+程序流程图+元器件清单+配套资料等)
  • 虹科展会 | 自动驾驶展品:上海汽车测试展精彩回顾