当前位置: 首页 > news >正文

powerjob的worker启动,研究完了这块代码之后我发现了,代码就是现实中我们码农的真实写照

这是一篇让你受益匪浅的文章,代码即使人生。

 worker启动比server启动要复杂一些,毕竟worker是要实际干活的,工欲善其事必先利其器,所以需要准备的工具还是不能少的,server对于powerjob来说,只是一个调度用的,说白了就是管理worker做什么工作的,只需要给他一个流程,让他按照流程上的内容,一次告诉worker去工作,至于怎么做,只有worker知道,server当然不会知道的,也没有必要知道。

worker的启动大概分为以下这么几个步骤:

  1. 判断是否重复初始化

  2. 获取默认配置

  3. 校验appName

  4. 获取IP地址和端口(这一步和server端是一样的,在这里就不赘述了)

  5. 初始化定时线程

  6. 连接server

  7. 初始化Akka

  8. 初始化日志系统

  9. 初始化存储

  10. 初始化定时任务

步骤是蛮多的,但是其实都不是非常的复杂

由于worker的启动源码过于多了,就不全贴出来了。

开胃菜

首先因为该worker包是需要被依赖的,所以并没有spring的启动类,但是却有启动spring时添加其配置的内容,在worker包里面的PowerJobWorker类,是实现了ApplicationContextAware, InitializingBean, DisposableBean这三个类,这三个类默认有三个方法,分别是

public void setApplicationContext(ApplicationContext applicationContext) throws BeansException

public void afterPropertiesSet() throws Exception

public void destroy() throws Exception

99.9%的初始化工作都是在afterPropertiesSet这个方法里完成的,看名字大概也能猜出这个方法的意思,就是字面意思。

判断是否重复初始化

if (!initialized.compareAndSet(false, true)) {log.warn("[PowerJobWorker] please do not repeat the initialization");return;
}

这段代码意思就是一个initialized的变量,代表的意思是是否初始化,一开始的时候是false,因为还没有开始初始化,然后compareAndSet后面跟着两个参数,第一个参数是预期值,如果预期值和当前的变量值一样,则将当前变量更新为第二个参数的值。

如果initialized的值是false ,和预期值一样,则compareAndSet方法返回的是true,跳出if条件语句,并且initialized值变成了true。

如果initialized的·值是true,和预期值不一样,则compareAndSet返回的是false,进入条件语句,打印告警日志,并且不再有后续的初始化操作,此时initialized的值不变,依旧是true。

获取默认配置

PowerJobWorkerConfig config = workerRuntime.getWorkerConfig();//下面这些代码都是在之后的初始化操作中进行赋值的
workerRuntime.setWorkerAddress(workerAddress);workerRuntime.setServerDiscoveryService(serverDiscoveryService);workerRuntime.setActorSystem(actorSystem);workerRuntime.setOmsLogHandler(omsLogHandler);workerRuntime.setTaskPersistenceService(taskPersistenceService);

这个WorkerRuntime类是worker.common包里面的一个Bean类,记录了一些worker运行时的参数和环境,里面有的有默认值,有的没有默认值,需要在初始化的时候进行赋值。比如上面代码中,后面set的值

校验appName

我将里面有关打印日志的部分全部拿掉了,通过appName,去server请求appId,如果请求不到,则说明配置文件里面的“powerjob.worker.app-name”配置的有问题,所有appName都是需要注册的,所以名字是不会重复的。

private void assertAppName() {//获取到appNamePowerJobWorkerConfig config = workerRuntime.getWorkerConfig();String appName = config.getAppName();Objects.requireNonNull(appName, "appName can't be empty!");//调用server端的服务String url = "http://%s/server/assert?appName=%s";for (String server : config.getServerAddress()) {
//获取到server的请求地址String realUrl = String.format(url, server, appName);try {
//请求服务,返回结果String resultDTOStr = CommonUtils.executeWithRetry0(() -> HttpUtils.get(realUrl));
//解析返回结果ResultDTO resultDTO = JsonUtils.parseObject(resultDTOStr, ResultDTO.class);if (resultDTO.isSuccess()) {
//将appId设置到运行环境里Long appId = Long.valueOf(resultDTO.getData().toString());workerRuntime.setAppId(appId);return;}else {throw new PowerJobException(resultDTO.getMessage());}}catch (PowerJobException oe) {throw oe;}catch (Exception ignore) {}}throw new PowerJobException("no server available!");
}

连接Server

 

serverDiscoveryService.start(timingPool);

最主要的就是上面这一行代码,这个代码里面主要流程如下:

  1. 将配置文件里面的服务器地址存入内存。

  2. 当前服务地址如果不为空,调用server端的/acquire服务获取结果。

  3. 如果经过第二步没有结果返回,则遍历配置文件中所有的server地址来获取结果。

  4. 如果依旧没有结果,说明连接不到server,需要将所有的本地任务停止。

  5. 如果得到结果,则将结果返回。

    private String discovery() {
    //1.将配置文件里面的服务器地址存入内存。if (ip2Address.isEmpty()) {config.getServerAddress().forEach(x -> ip2Address.put(x.split(":")[0], x));}String result = null;
    //2.当前服务地址如果不为空,调用server端的/acquire服务获取结果。String currentServer = currentServerAddress;if (!StringUtils.isEmpty(currentServer)) {String ip = currentServer.split(":")[0];String firstServerAddress = ip2Address.get(ip);if (firstServerAddress != null) {result = acquire(firstServerAddress);}}
    //3.如果经过第二步没有结果返回,则遍历配置文件中所有的server地址来获取结果。for (String httpServerAddress : config.getServerAddress()) {if (StringUtils.isEmpty(result)) {result = acquire(httpServerAddress);}else {break;}}if (StringUtils.isEmpty(result)) {
    //4.如果依旧没有结果,说明连接不到server,需要将所有的本地任务停止。if (FAILED_COUNT++ > MAX_FAILED_COUNT) {List<Long> frequentInstanceIds = TaskTrackerPool.getAllFrequentTaskTrackerKeys();if (!CollectionUtils.isEmpty(frequentInstanceIds)) {frequentInstanceIds.forEach(instanceId -> {TaskTracker taskTracker = TaskTrackerPool.remove(instanceId);taskTracker.destroy();});}FAILED_COUNT = 0;}return null;} else {
    //5.如果得到结果,则将结果返回。FAILED_COUNT = 0;return result;}
    }

    初始化日志系统

    OmsLogHandler omsLogHandler = new OmsLogHandler(workerAddress, actorSystem, serverDiscoveryService);

    这个日志系统的主要作用,就是将本地的日志上报的server上,从传进的参数就能看出,都是和通讯相关的内容。

    这个日志系统的提交也是异步单独占用一个线程,在之前开启的线程中,其中就有一个线程是用来提交日志的,该线程会在worker启动的最后开启,代码段如下:

    timingPool.scheduleWithFixedDelay(omsLogHandler.logSubmitter, 0, 5, TimeUnit.SECONDS);

    固定每5秒提交一次日志。

    初始化存储

    worker使用的是本地的H2数据库,持久化的策略分为磁盘和内存,在worker停止的时候,会将本地的数据文件全部销毁。其主要的初始化代码在worker.persistence包里面的ConnectionFactory类中,源代码如下:

  6. private final String DISK_JDBC_URL = String.format("jdbc:h2:file:%spowerjob_worker_db;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false", H2_PATH);
    private final String MEMORY_JDBC_URL = String.format("jdbc:h2:mem:%spowerjob_worker_db;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false", H2_PATH);public synchronized void initDatasource(StoreStrategy strategy) {strategy = strategy == null ? StoreStrategy.DISK : strategy;HikariConfig config = new HikariConfig();config.setDriverClassName(Driver.class.getName());config.setJdbcUrl(strategy == StoreStrategy.DISK ? DISK_JDBC_URL : MEMORY_JDBC_URL);config.setAutoCommit(true);// 池中最小空闲连接数量config.setMinimumIdle(2);// 池中最大连接数量config.setMaximumPoolSize(32);dataSource = new HikariDataSource(config);try {FileUtils.forceDeleteOnExit(new File(H2_PATH));}catch (Exception ignore) {}
    }

    HikariCP 是一个高性能的 JDBC 连接池组件,HikariConfig 就是其相关的配置类。

    总结

  7. worker工作起来确实不是很容易,需要找到自己的上级,还需要记录自己工作的日志,需要一个人干好多任务,还需要再不耽误正常任务的同时,向自己的上级汇报工作,汇报自己的身体状态。简直就是我们底层程序员的真实写照啊。里面使用了很多经典的技术,也有比较新的技术,对于日志系统,做的还是让我学到了很多。

http://www.lryc.cn/news/8707.html

相关文章:

  • 配置Qt Creator
  • C++-类和对象(下)
  • 什么是仓库管理?
  • 对话系统学习概述(仅够参考)
  • 免费CRM客户管理系统真的存在吗?不仅有,还有5个!
  • C#开发的OpenRA使用自定义字典的比较函数
  • DHCP协议
  • C语言进阶——自定义类型:枚举、联合
  • 背景透明(opacity vs background)
  • 华为OD机试 - 最小施肥机能效(Python)| 真题+思路+考点+代码+岗位
  • vue2 使用 cesium 篇
  • 2023预测:PKI将受到企业重点关注
  • linux基本功系列之grep命令
  • 硬件设计——DDR
  • 最近你提前还贷了吗
  • 关于STM32常用的8种GPIO输入输出模式的理解
  • vue - vue项目中解决 IOS + H5 滑动边界橡皮筋弹性效果
  • webpack(高级)--创建自己的loader 同步loader 异步loader loader参数校验
  • Assignment写作各个部分怎么衔接完美?
  • 医疗器械实验室设计规划全了SICOLAB
  • 2023年浙江建筑施工物料提升(建筑特种作业)模拟试题及答案
  • shell编程经典案例,建议收藏
  • C++通用容器
  • 字符串的特殊读取——基于蓝桥杯两道题目(C/C++)
  • [足式机器人]Part3机构运动微分几何学分析与综合Ch01-4 平面运动微分几何学——【读书笔记】
  • 【每日一题Day120】LC2341数组能形成多少数对 | 哈希表 排序
  • win11/10+opencv3.x/4.x配置 VS2019方法(简单使用,亲测)
  • HTTP协议---详细讲解
  • Syntax-Aware Aspect-Level Sentiment Classification with PWCN 论文阅读笔记
  • hadoop考试应急