当前位置: 首页 > news >正文

聊聊PowerJob的AliOssService

本文主要研究一下PowerJob的AliOssService

DFsService

tech/powerjob/server/extension/dfs/DFsService.java

public interface DFsService {/*** 存储文件* @param storeRequest 存储请求* @throws IOException 异常*/void store(StoreRequest storeRequest) throws IOException;/*** 下载文件* @param downloadRequest 文件下载请求* @throws IOException 异常*/void download(DownloadRequest downloadRequest) throws IOException;/*** 获取文件元信息* @param fileLocation 文件位置* @return 存在则返回文件元信息* @throws IOException 异常*/Optional<FileMeta> fetchFileMeta(FileLocation fileLocation) throws IOException;/*** 清理 powerjob 认为“过期”的文件* 部分存储系统自带生命周期管理(如阿里云OSS,则不需要单独实现该方法)* @param bucket bucket* @param days 天数,需要清理超过 X 天的文件*/default void cleanExpiredFiles(String bucket, int days) {}
}

DFsService接口定义了store、download、fetchFileMeta、cleanExpiredFiles方法

AbstractDFsService

tech/powerjob/server/persistence/storage/AbstractDFsService.java

@Slf4j
public abstract class AbstractDFsService implements DFsService, ApplicationContextAware, DisposableBean {protected ApplicationContext applicationContext;public AbstractDFsService() {log.info("[DFsService] invoke [{}]'s constructor", this.getClass().getName());}abstract protected void init(ApplicationContext applicationContext);protected static final String PROPERTY_KEY = "oms.storage.dfs";protected static String fetchProperty(Environment environment, String dfsType, String key) {String pKey = String.format("%s.%s.%s", PROPERTY_KEY, dfsType, key);return environment.getProperty(pKey);}@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext = applicationContext;log.info("[DFsService] invoke [{}]'s setApplicationContext", this.getClass().getName());init(applicationContext);}
}

AbstractDFsService声明实现DFsService、ApplicationContextAware、DisposableBean接口,它在setApplicationContext方法执行了init

AliOssService

tech/powerjob/server/persistence/storage/impl/AliOssService.java

@Slf4j
@Priority(value = Integer.MAX_VALUE - 1)
@Conditional(AliOssService.AliOssCondition.class)
public class AliOssService extends AbstractDFsService {private static final String TYPE_ALI_OSS = "alioss";private static final String KEY_ENDPOINT = "endpoint";private static final String KEY_BUCKET = "bucket";private static final String KEY_CREDENTIAL_TYPE = "credential_type";private static final String KEY_AK = "ak";private static final String KEY_SK = "sk";private static final String KEY_TOKEN = "token";private OSS oss;private String bucket;private static final int DOWNLOAD_PART_SIZE = 10240;private static final String NO_SUCH_KEY = "NoSuchKey";//......void initOssClient(String endpoint, String bucket, String mode, String ak, String sk, String token) throws Exception {log.info("[AliOssService] init OSS by config: endpoint={},bucket={},credentialType={},ak={},sk={},token={}", endpoint, bucket, mode, ak, sk, token);if (StringUtils.isEmpty(bucket)) {throw new IllegalArgumentException("'oms.storage.dfs.alioss.bucket' can't be empty, please creat a bucket in aliyun oss console then config it to powerjob");}this.bucket = bucket;CredentialsProvider credentialsProvider;CredentialType credentialType = CredentialType.parse(mode);switch (credentialType) {case PWD:credentialsProvider = new DefaultCredentialProvider(ak, sk, token);break;case SYSTEM_PROPERTY:credentialsProvider = CredentialsProviderFactory.newSystemPropertiesCredentialsProvider();break;default:credentialsProvider = CredentialsProviderFactory.newEnvironmentVariableCredentialsProvider();}this.oss = new OSSClientBuilder().build(endpoint, credentialsProvider);log.info("[AliOssService] initialize successfully, THIS_WILL_BE_THE_STORAGE_LAYER.");}//......    
}    

AliOssService继承了AbstractDFsService

store

    @Overridepublic void store(StoreRequest storeRequest) throws IOException {ObjectMetadata objectMetadata = new ObjectMetadata();PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, parseFileName(storeRequest.getFileLocation()), storeRequest.getLocalFile(), objectMetadata);oss.putObject(putObjectRequest);}

store方法创建PutObjectRequest,使用oss.putObject进行上传

download

    @Overridepublic void download(DownloadRequest downloadRequest) throws IOException {FileLocation dfl = downloadRequest.getFileLocation();DownloadFileRequest downloadFileRequest = new DownloadFileRequest(bucket, parseFileName(dfl), downloadRequest.getTarget().getAbsolutePath(), DOWNLOAD_PART_SIZE);try {FileUtils.forceMkdirParent(downloadRequest.getTarget());oss.downloadFile(downloadFileRequest);} catch (Throwable t) {ExceptionUtils.rethrow(t);}}

download方法则根据DownloadRequest指定的FileLocation创建DownloadFileRequest,然后通过oss.downloadFile(downloadFileRequest)进行下载

fetchFileMeta

    @Overridepublic Optional<FileMeta> fetchFileMeta(FileLocation fileLocation) throws IOException {try {ObjectMetadata objectMetadata = oss.getObjectMetadata(bucket, parseFileName(fileLocation));return Optional.ofNullable(objectMetadata).map(ossM -> {Map<String, Object> metaInfo = Maps.newHashMap();metaInfo.putAll(ossM.getRawMetadata());if (ossM.getUserMetadata() != null) {metaInfo.putAll(ossM.getUserMetadata());}return new FileMeta().setLastModifiedTime(ossM.getLastModified()).setLength(ossM.getContentLength()).setMetaInfo(metaInfo);});} catch (OSSException oe) {String errorCode = oe.getErrorCode();if (NO_SUCH_KEY.equalsIgnoreCase(errorCode)) {return Optional.empty();}ExceptionUtils.rethrow(oe);}return Optional.empty();}

fetchFileMeta方法通过oss.getObjectMetadata获取ObjectMetadata

cleanExpiredFiles

    @Overridepublic void cleanExpiredFiles(String bucket, int days) {/*阿里云 OSS 自带生命周期管理,请参考文档进行配置,代码层面不进行实现(浪费服务器资源)https://help.aliyun.com/zh/oss/user-guide/overview-54阿里云 OSS 自带生命周期管理,请参考文档进行配置,代码层面不进行实现(浪费服务器资源)https://help.aliyun.com/zh/oss/user-guide/overview-54阿里云 OSS 自带生命周期管理,请参考文档进行配置,代码层面不进行实现(浪费服务器资源)https://help.aliyun.com/zh/oss/user-guide/overview-54*/}

cleanExpiredFiles则是空操作

init

    protected void init(ApplicationContext applicationContext) {Environment environment = applicationContext.getEnvironment();String endpoint = fetchProperty(environment, TYPE_ALI_OSS, KEY_ENDPOINT);String bkt = fetchProperty(environment, TYPE_ALI_OSS, KEY_BUCKET);String ct = fetchProperty(environment, TYPE_ALI_OSS, KEY_CREDENTIAL_TYPE);String ak = fetchProperty(environment, TYPE_ALI_OSS, KEY_AK);String sk = fetchProperty(environment, TYPE_ALI_OSS, KEY_SK);String token = fetchProperty(environment, TYPE_ALI_OSS, KEY_TOKEN);try {initOssClient(endpoint, bkt, ct, ak, sk, token);} catch (Exception e) {ExceptionUtils.rethrow(e);}}

init则是通过environment获取相关属性,然后执行initOssClient

小结

DFsService接口定义了store、download、fetchFileMeta、cleanExpiredFiles方法;AbstractDFsService声明实现DFsService、ApplicationContextAware、DisposableBean接口,它在setApplicationContext方法执行了init;AliOssService继承了AbstractDFsService,通过ossClient实现了store、download、fetchFileMeta方法。

http://www.lryc.cn/news/284884.html

相关文章:

  • 【VRTK】【Unity】【PICO】PICO项目打包后闪退的根本原因
  • 《PCI Express体系结构导读》随记 —— 第I篇 第2章 PCI总线的桥与配置(21)
  • 大数据前馈神经网络解密:深入理解人工智能的基石
  • 【新书推荐】Web3.0应用开发实战(从Web 2.0到Web 3.0)
  • vue3中状态管理库pinia的安装和使用方法介绍及和vuex的区别
  • 领略指针之妙
  • 迭代器模式介绍
  • 算法每日一题: 最大字符串匹配数目 | 哈希 | 哈希表 | 题意分析
  • 自然语言处理(Natural Language Processing,NLP)解密
  • 【DevOps-08-5】目标服务器准备脚本,并基于Harbor的最终部署
  • 用Java实现01背包问题 用贪心算法
  • JUC并发编程-8锁现象
  • 集美大学“第15届蓝桥杯大赛(软件类)“校内选拔赛 D矩阵选数
  • Android System Service系统服务--1
  • 【RT-DETR有效改进】华为 | Ghostnetv1一种专为移动端设计的特征提取网络
  • 45个经典Linux面试题!赶紧收藏!
  • 将字符串中可能被视为正则表达式的特殊字符进行转义re.escape()
  • C语言:函数指针的使用
  • 「实战应用」如何用DHTMLX Gantt构建类似JIRA式的项目路线图(二)
  • Webpack5入门到原理18:Plugin 原理
  • PWM之舵机
  • Python并发与多线程:IO并发(阻塞IO、非阻塞IO、IO多路复用、异步IO)
  • React16源码: React中的IndeterminateComponent的源码实现
  • SpringBoot:详解Bean生命周期和作用域
  • 【图解数据结构】顺序表实战指南:手把手教你详细实现(超详细解析)
  • WordPress怎么禁用文章和页面古腾堡块编辑器?如何恢复经典小工具?
  • 【HarmonyOS】掌握布局组件,提升应用体验
  • 第4周:Pytorch——综合应用和实战项目 Day 28-30: 学习资源和社区参与
  • TypeScript教程(一)在vscode中的配置TypeScript环境
  • sshpass的安装与使用