XXL-JOB(2)
Glue模式
任务以源码的形式去维护调度中心,支持实时编译,无需指定JobHandler。
实际上是继承自JobHandler的java类代码,在执行器中运行,可以使用@Resource/@Autowire注入执行器里中的其他服务.
在执行器中添加service
@Service
public class HelloService {public void methodA(){System.out.println("执行MethodA的方法");}public void methodB(){System.out.println("执行MethodB的方法");}
}
在 调度中心添加Gule任务,在gule idea中添加代码
package com.xxl.job.service.handler;import cn.wolfcode.xxljobdemo.service.HelloService;
import com.xxl.job.core.handler.IJobHandler;
import org.springframework.beans.factory.annotation.Autowired;public class DemoGlueJobHandler extends IJobHandler {@Autowiredprivate HelloService helloService;@Overridepublic void execute() throws Exception {helloService.methodA();}
}
启动任务即可。
集群
启动两个springboot项目,修改tomcat端口号和执行器端口号
-Dserver.port=8093 -Dxxl.job.executor.port=9993
-Dserver.port=8094 -Dxxl.job.executor.port=9994
将调度中心的任务修改为轮询路由
调度路由
-
FIRST(第一个):固定选择第一个机器
-
LAST(最后一个):固定选择最后一个机器;
-
ROUND(轮询):依次的选择在线的机器发起调度
-
RANDOM(随机):随机选择在线的机器;
-
CONSISTENT_HASH(一致性HASH):
每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。
-
LEAST_FREQUENTLY_USED(最不经常使用):使用频率最低的机器优先被选举;
-
LEAST_RECENTLY_USED(最近最久未使用):最久未使用的机器优先被选举;
-
FAILOVER(故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度;
-
BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;
-
SHARDING_BROADCAST(分片广播):
广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;
分片广播
xxl-job调度中心发出一次调度,所有相关节点全部执行一次。
如果使用传统的轮询调度,只会将所有的请求由一个结点承担,效率低。
而分片广播的优点:
1.将所有请求分配个各个结点,由每个结点承担,具体分片策略由逻辑代码决定,调度中心知识让所有的结点执行。
2.XXL-JOB为每个注册结点分配一个index,同样XXL-JOB会记录结点总数
int shardIndex = XxlJobHelper.getShardIndex();结点数
int shardTotal = XxlJobHelper.getShardTotal();结点总数
案例:
1.添加依赖
<!--MyBatis驱动-->
<dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>1.2.0</version>
</dependency>
<!--mysql驱动-->
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId>
</dependency>
<!--lombok依赖-->
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><scope>provided</scope>
</dependency>
<dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.1.10</version>
</dependency>
2. 添加配置
spring.datasource.url=jdbc:mysql://localhost:3306/xxl_job_demo?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=UTF-8
spring.datasource.driverClassName=com.mysql.jdbc.Driver
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.username=root
spring.datasource.password=WolfCode_2017
3.添加实体类
@Setter@Getter
public class UserMobilePlan {private Long id;//主键private String username;//用户名private String nickname;//昵称private String phone;//手机号码private String info;//备注
}
4.添加mapper处理
@Mapper
public interface UserMobilePlanMapper {@Select("select * from t_user_mobile_plan where mod(id,#{shardingTotal})=#{shardingIndex}")List<UserMobilePlan> selectByMod(@Param("shardingIndex") Integer shardingIndex,@Param("shardingTotal")Integer shardingTotal);@Select("select * from t_user_mobile_plan")List<UserMobilePlan> selectAll();
}
将id和分片索引总数取余
@XxlJob("sendMsgShardingHandler")
public void sendMsgShardingHandler() throws Exception{System.out.println("任务开始时间:"+new Date());int shardTotal = XxlJobHelper.getShardTotal();int shardIndex = XxlJobHelper.getShardIndex();List<UserMobilePlan> userMobilePlans = null;if(shardTotal==1){//如果没有分片就直接查询所有数据userMobilePlans = userMobilePlanMapper.selectAll();}else{userMobilePlans = userMobilePlanMapper.selectByMod(shardIndex,shardTotal);}System.out.println("处理任务数量:"+userMobilePlans.size());Long startTime = System.currentTimeMillis();userMobilePlans.forEach(item->{try {TimeUnit.MILLISECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}});System.out.println("任务结束时间:"+new Date());System.out.println("任务耗时:"+(System.currentTimeMillis()-startTime)+"毫秒");
}