当前位置: 首页 > news >正文

分布式定时任务:Elastic-Job-Lite

Elastic-Job-Lite 是一款由 Apache 开源的轻量级分布式任务调度框架,属于 ShardingSphere 生态体系的一部分。它专注于分布式任务调度,支持弹性伸缩、分片处理、高可用等特性,且不依赖中心化架构。

一、基础

(一)核心特性
  1. 分布式协调
    通过 ZooKeeper 实现作业的分布式调度和协调,确保任务在集群环境中不重复、不遗漏地执行。

  2. 分片机制
    支持将任务拆分为多个分片(Sharding)并行执行,提升处理效率。例如:

    // 根据分片参数处理不同数据
    int shardIndex = context.getShardingItem();  // 分片索引(0,1,2...)
    String shardParam = context.getShardingParameter();  // 分片参数
    
  3. 弹性伸缩
    动态感知集群节点变化,自动重新分配分片。新增节点时,分片会被均匀分配到新节点;节点下线时,其分片会被其他节点接管。

  4. 多种作业类型

    • SimpleJob:简单任务,实现 SimpleJob 接口即可。
    • DataflowJob:数据流任务,支持数据抓取(fetch)和处理(process)。
    • ScriptJob:脚本任务,支持 Shell、Python 等脚本语言。
  5. 失效转移
    当作业节点崩溃时,正在执行的分片会被转移到其他节点继续执行。

  6. 幂等性保障
    通过 ZooKeeper 实现分布式锁,确保同一分片在同一时间只被一个节点执行。

(二)架构设计

Elastic-Job-Lite 采用去中心化架构:

  • 作业节点:直接部署在应用中,既是执行节点也是调度节点。
  • 注册中心:依赖 ZooKeeper 存储作业元数据和运行状态。
  • 无中心化调度器:每个节点通过注册中心协调,无需单独的调度中心。
(三)核心概念
  1. 作业(Job)
    任务的抽象,支持 Simple、Dataflow、Script 三种类型。

  2. 分片(Sharding)
    将任务拆分为多个独立的子任务,每个分片由不同的节点执行。例如:

    elasticjob:jobs:myJob:sharding-total-count: 3  # 总分片数sharding-item-parameters: "0=北京,1=上海,2=广州"  # 分片参数
    
  3. 注册中心(Registry Center)
    ZooKeeper 作为协调服务,存储作业配置、运行状态和分片信息。

  4. 作业实例(Job Instance)
    每个作业节点启动时会向注册中心注册自己,成为一个作业实例。

二、在springboot中使用Elastic-Job-Lite

(一)添加依赖

pom.xml 中添加 Elastic-Job-Lite 和 ZooKeeper 客户端依赖:

<!-- Elastic-Job-Lite Spring Boot Starter -->
<dependency><groupId>org.apache.shardingsphere.elasticjob</groupId><artifactId>elasticjob-lite-spring-boot-starter</artifactId><version>3.0.3</version> <!-- 最新稳定版本 -->
</dependency><!-- ZooKeeper 客户端 -->
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.3.0</version>
</dependency>
(二) 配置 ZooKeeper 注册中心

你提到的没错!Elastic-Job-Lite 配置 ZooKeeper 确实有三种主要方式,我之前的回答集中在 Java 代码配置 上。现在我补充完整另外两种方式:

1.YAML 配置(Spring Boot 自动配置)

最简洁的方式,通过 application.yml 配置:

elasticjob:reg-center:server-lists: localhost:2181  # ZooKeeper 地址namespace: elastic-job        # 命名空间base-sleep-time-milliseconds: 1000  # 初始重试等待时间max-sleep-time-milliseconds: 3000  # 最大重试等待时间max-retries: 3                # 最大重试次数digest: ""                    # 认证信息(可选)jobs:mySimpleJob:type: SIMPLEclass: com.example.job.MySimpleJob  # 作业类路径cron: "0/10 * * * * ?"              # Cron 表达式sharding-total-count: 3             # 分片总数sharding-item-parameters: "0=A,1=B,2=C"  # 分片参数overwrite: true                     # 覆盖注册中心配置

关键点

  • elasticjob.reg-center 配置 ZooKeeper 连接信息。
  • elasticjob.jobs 下定义具体作业,支持 SIMPLEDATAFLOWSCRIPT 等类型。
2.Java 代码配置(手动构建 Bean)

前面示例中使用的方式,适合需要灵活控制配置的场景:

@Configuration
public class JobConfig {@Bean(initMethod = "init")public ZookeeperRegistryCenter regCenter() {ZookeeperConfiguration zkConfig = new ZookeeperConfiguration("localhost:2181", "elastic-job");return new ZookeeperRegistryCenter(zkConfig);}@Bean(initMethod = "init")public SpringJobScheduler simpleJobScheduler(MySimpleJob mySimpleJob, ZookeeperRegistryCenter regCenter) {JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder("mySimpleJob", "0/10 * * * * ?", 3).shardingItemParameters("0=A,1=B,2=C").build();SimpleJobConfiguration jobConfig = new SimpleJobConfiguration(coreConfig, MySimpleJob.class.getCanonicalName());return new SpringJobScheduler(mySimpleJob, regCenter, LiteJobConfiguration.newBuilder(jobConfig).overwrite(true).build());}
}

关键点

  • 手动创建 ZookeeperRegistryCenterSpringJobScheduler Bean。
  • 通过 JobCoreConfigurationSimpleJobConfiguration 构建作业配置。
(三)创建简单作业类

实现 SimpleJob 接口,定义作业逻辑:

import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import org.springframework.stereotype.Component;@Component
public class MySimpleJob implements SimpleJob {@Overridepublic void execute(ShardingContext shardingContext) {// 获取分片信息int shardIndex = shardingContext.getShardingItem();String shardParam = shardingContext.getShardingParameter();// 作业逻辑(根据分片参数处理不同数据)System.out.printf("分片项: %d, 参数: %s, 时间: %s%n", shardIndex, shardParam, System.currentTimeMillis());// 示例:根据分片处理不同的数据// if (shardIndex == 0) { processGroupA(); }// else if (shardIndex == 1) { processGroupB(); }}
}
(四)配置作业

使用 @ElasticSimpleJob 注解配置作业:

import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.apache.shardingsphere.elasticjob.api.ElasticSimpleJob;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class JobConfig {@Autowiredprivate ZookeeperRegistryCenter regCenter;@Autowiredprivate MySimpleJob mySimpleJob;@Bean(initMethod = "init")public SpringJobScheduler simpleJobScheduler() {// 定义作业核心配置JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder("mySimpleJob",      // 作业名称"0/10 * * * * ?",   // Cron 表达式3                   // 分片总数).shardingItemParameters("0=A,1=B,2=C")  // 分片参数.build();// 定义 Simple 作业配置SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(coreConfig, MySimpleJob.class.getCanonicalName());// 定义 Lite 作业配置LiteJobConfiguration jobConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true)  // 允许覆盖注册中心配置.build();// 创建作业调度器return new SpringJobScheduler(mySimpleJob, regCenter, jobConfig);}
}
(五)配置说明
参数说明
reg-center.server-listsZooKeeper 服务器地址,多个地址用逗号分隔(如 host1:2181,host2:2181
reg-center.namespace命名空间,用于隔离不同项目的作业配置
coreConfig.cronCron 表达式,定义作业执行时间规则
coreConfig.shardingTotalCount分片总数,决定作业拆分为多少个并行执行单元
coreConfig.shardingItemParameters分片参数,格式为 0=A,1=B,2=C,为每个分片指定参数
http://www.lryc.cn/news/578827.html

相关文章:

  • GC393低功耗双电压比较器:精准、高效的信号处理解决方案
  • Axure版ArcoDesign 组件库-免费版
  • OpenCV CUDA模块设备层-----高效地计算两个uint 类型值的平均值函数vavg2()
  • Centos系统及国产麒麟系统设置自己写的go服务的开机启动项完整教程
  • 开源 | V3.1.1慧知开源重卡运营充电桩平台 - 重卡运营充电桩平台管理解决方案;企业级完整代码 多租户、模拟器、多运营商、多小程序;
  • Chrome 下载文件时总是提示“已阻止不安全的下载”的解决方案
  • DQL-1-基础查询
  • 技术学习_大语言模型
  • 大数据平台与数据中台:从概念到落地的系统化实践指南
  • day045-nginx跳转功能补充与https
  • 安全风险监测预警平台对企业的价值
  • 【AI智能体】基于Coze 制作高质量PPT实战操作详解
  • Android Native 之 inputflinger进程分析
  • flutter flutter_vlc_player播放视频设置循环播放失效、初始化后获取不到视频宽高
  • PyQt5-高级控件-容器StackedWidget
  • 学习笔记(29):训练集与测试集划分详解:train_test_split 函数深度解析
  • Servlet开发流程(包含IntelliJ IDEA项目添加Tomcat依赖的详细教程)
  • 玄机——某学校系统中挖矿病毒应急排查
  • 打造Docker Swarm集群服务编排部署指南:从入门到精通
  • 【公司环境下发布个人NPM包完整教程】
  • 网络协议概念与应用层
  • 解释LLM怎么预测下一个词语的
  • 图像二值化方法及 Python OpenCV 实现
  • 使用v-bind指令绑定属性
  • 【第三章:神经网络原理详解与Pytorch入门】01.神经网络算法理论详解与实践-(1)神经网络预备知识(线性代数、微积分、概率等)
  • 新能源汽车功率级测试自动化方案:从理论到实践的深度解析
  • 如何将文件从 iPhone 传输到 Android(新指南)
  • 网安-XSS-pikachu
  • MUX-VLAN基本概述
  • 【格与代数系统】格与哈斯图