当前位置: 首页 > news >正文

聊聊PowerJob的Alarmable

本文主要研究一下PowerJob的Alarmable

Alarmable

tech/powerjob/server/extension/Alarmable.java

public interface Alarmable {void onFailed(Alarm alarm, List<UserInfoDO> targetUserList);
}

Alarmable接口定义了onFailed方法,其入参为alarm及targetUserList

Alarm

public interface Alarm extends PowerSerializable {String fetchTitle();default String fetchContent() {StringBuilder sb = new StringBuilder();JSONObject content = JSONObject.parseObject(JSONObject.toJSONString(this));content.forEach((key, originWord) -> {sb.append(key).append(": ");String word = String.valueOf(originWord);if (StringUtils.endsWithIgnoreCase(key, "time") || StringUtils.endsWithIgnoreCase(key, "date")) {try {if (originWord instanceof Long) {word = CommonUtils.formatTime((Long) originWord);}}catch (Exception ignore) {}}sb.append(word).append(OmsConstant.LINE_SEPARATOR);});return sb.toString();}
}

Alarm定义了fetchTitle方法,提供了fetchContent默认方法,它有两个实现类分别是JobInstanceAlarm、WorkflowInstanceAlarm

DingTalkAlarmService

tech/powerjob/server/extension/defaultimpl/alarm/impl/DingTalkAlarmService.java

@Slf4j
@Service
@RequiredArgsConstructor
public class DingTalkAlarmService implements Alarmable {private final Environment environment;private Long agentId;private DingTalkUtils dingTalkUtils;private Cache<String, String> mobile2UserIdCache;private static final int CACHE_SIZE = 8192;/*** 防止缓存击穿*/private static final String EMPTY_TAG = "EMPTY";@Overridepublic void onFailed(Alarm alarm, List<UserInfoDO> targetUserList) {if (dingTalkUtils == null) {return;}Set<String> userIds = Sets.newHashSet();targetUserList.forEach(user -> {String phone = user.getPhone();if (StringUtils.isEmpty(phone)) {return;}try {String userId = mobile2UserIdCache.get(phone, () -> {try {return dingTalkUtils.fetchUserIdByMobile(phone);} catch (PowerJobException ignore) {return EMPTY_TAG;} catch (Exception ignore) {return null;}});if (!EMPTY_TAG.equals(userId)) {userIds .add(userId);}}catch (Exception ignore) {}});userIds.remove(null);if (!userIds.isEmpty()) {String userListStr = SJ.COMMA_JOINER.skipNulls().join(userIds);List<DingTalkUtils.MarkdownEntity> markdownEntities = Lists.newLinkedList();markdownEntities.add(new DingTalkUtils.MarkdownEntity("server", NetUtils.getLocalHost()));String content = alarm.fetchContent().replaceAll(OmsConstant.LINE_SEPARATOR, OmsConstant.COMMA);markdownEntities.add(new DingTalkUtils.MarkdownEntity("content", content));try {dingTalkUtils.sendMarkdownAsync(alarm.fetchTitle(), markdownEntities, userListStr, agentId);}catch (Exception e) {log.error("[DingTalkAlarmService] send ding message failed, reason is {}", e.getMessage());}}}@PostConstructpublic void init() {String agentId = environment.getProperty(PowerJobServerConfigKey.DING_AGENT_ID);String appKey = environment.getProperty(PowerJobServerConfigKey.DING_APP_KEY);String appSecret = environment.getProperty(PowerJobServerConfigKey.DING_APP_SECRET);log.info("[DingTalkAlarmService] init with appKey:{},appSecret:{},agentId:{}", appKey, appSecret, agentId);if (StringUtils.isAnyBlank(agentId, appKey, appSecret)) {log.warn("[DingTalkAlarmService] cannot get agentId, appKey, appSecret at the same time, this service is unavailable");return;}if (!StringUtils.isNumeric(agentId)) {log.warn("[DingTalkAlarmService] DingTalkAlarmService is unavailable due to invalid agentId: {}", agentId);return;}this.agentId = Long.valueOf(agentId);dingTalkUtils = new DingTalkUtils(appKey, appSecret);mobile2UserIdCache = CacheBuilder.newBuilder().maximumSize(CACHE_SIZE).softValues().build();log.info("[DingTalkAlarmService] init DingTalkAlarmService successfully!");}}

DingTalkAlarmService实现了Alarmable接口,其onFailed遍历targetUserList获取userId,最后通过dingTalkUtils.sendMarkdownAsync发送

MailAlarmService

tech/powerjob/server/extension/defaultimpl/alarm/impl/MailAlarmService.java

@Slf4j
@Service
public class MailAlarmService implements Alarmable {@Resourceprivate Environment environment;private JavaMailSender javaMailSender;@Value("${spring.mail.username:''}")private String from;@Overridepublic void onFailed(Alarm alarm, List<UserInfoDO> targetUserList) {if (CollectionUtils.isEmpty(targetUserList) || javaMailSender == null || StringUtils.isEmpty(from)) {return;}SimpleMailMessage sm = new SimpleMailMessage();try {sm.setFrom(from);sm.setTo(targetUserList.stream().map(UserInfoDO::getEmail).filter(Objects::nonNull).toArray(String[]::new));sm.setSubject(alarm.fetchTitle());sm.setText(alarm.fetchContent());javaMailSender.send(sm);}catch (Exception e) {log.warn("[MailAlarmService] send mail failed, reason is {}", e.getMessage());}}@Autowired(required = false)public void setJavaMailSender(JavaMailSender javaMailSender) {this.javaMailSender = javaMailSender;}}

MailAlarmService实现了Alarmable接口,其onFailed方法构建SimpleMailMessage,然后通过spring的javaMailSender.send发送

WebHookAlarmService

tech/powerjob/server/extension/defaultimpl/alarm/impl/WebHookAlarmService.java

@Slf4j
@Service
public class WebHookAlarmService implements Alarmable {private static final String HTTP_PROTOCOL_PREFIX = "http://";private static final String HTTPS_PROTOCOL_PREFIX = "https://";@Overridepublic void onFailed(Alarm alarm, List<UserInfoDO> targetUserList) {if (CollectionUtils.isEmpty(targetUserList)) {return;}targetUserList.forEach(user -> {String webHook = user.getWebHook();if (StringUtils.isEmpty(webHook)) {return;}// 自动添加协议头if (!webHook.startsWith(HTTP_PROTOCOL_PREFIX) && !webHook.startsWith(HTTPS_PROTOCOL_PREFIX)) {webHook = HTTP_PROTOCOL_PREFIX + webHook;}MediaType jsonType = MediaType.parse(OmsConstant.JSON_MEDIA_TYPE);RequestBody requestBody = RequestBody.create(jsonType, JSONObject.toJSONString(alarm));try {String response = HttpUtils.post(webHook, requestBody);log.info("[WebHookAlarmService] invoke webhook[url={}] successfully, response is {}", webHook, response);}catch (Exception e) {log.warn("[WebHookAlarmService] invoke webhook[url={}] failed!", webHook, e);}});}
}

WebHookAlarmService实现了Alarmable接口,其onFailed方法遍历targetUserList,挨个执行HttpUtils.post(webHook, requestBody),用的是okhttp3来实现http请求回调

小结

PowerJob的Alarmable接口定义了onFailed方法,其入参为alarm及targetUserList;它有三个实现类,分别是DingTalkAlarmService(用的是DingTalkClient)、MailAlarmService(用的是spring的JavaMailSender)、WebHookAlarmService(用的是okhttp3的OkHttpClient)。

http://www.lryc.cn/news/280430.html

相关文章:

  • 系列三十五、获取Excel中的总记录数
  • VMware workstation安装debian-12.1.0虚拟机并配置网络
  • centos下系统全局检测工具dstat使用
  • 无人机群ros通信
  • LeetCode刷题:142. 环形链表 II
  • Laravel 使用rdkafka_laravel详细教程(实操避坑)
  • 439 - Knight Moves (UVA)
  • 数据结构(c)冒泡排序
  • 并发编程之并发容器
  • K8s---存储卷(动态pv和pvc)
  • JS判断对象是否为空对象的几种方法
  • 算法通关村第十五关—用4KB内存寻找重复元素(青铜)
  • 【PHP】判断字符串是否是有效的base64编码
  • 鼎盛合|测量精度SOC芯片开发中的技术问题整理
  • sql | 学生参加各科考试次数
  • uniapp(vue2)+VoerkaI18n多语言
  • C51--测速小车
  • ORACLE报错:ORA-04091 表XXX发生了变化,触发器/函数不能读它
  • Arm LDM和STM的寻址方式
  • 网络技术基础入门全套实验-厦门微思网络CCNA实验手册
  • 【已解决】C语言实现多线程检索数据
  • 用LM Studio:2分钟在本地免费部署大语言模型,替代ChatGPT
  • C语言经典算法之直接排序算法
  • 前端开发vscode 常用插件记录
  • 基于JavaWeb+BS架构+SpringBoot+Vue基于web的多媒体素材管理系统的设计和实现
  • 常用的dom操作
  • Hotspot源码解析-第十七章-虚拟机万物创建(三)
  • Spring MVC 的RequestMapping注解
  • navicat for oracle
  • 行业分享----dbaplus174期:美团基于Orchestrator的MySQL高可用实践