当前位置: 首页 > news >正文

使用DolphinScheduler接口实现批量导入工作流并上线

使用DS接口实现批量导入工作量并上线脚本

前面实现了批量生成DS的任务,当导入时发现只能逐个导入,因此通过接口实现会更方便。

DS接口文档

DS是有接口文档的地址是

http://IP:12345/dolphinscheduler/swagger-ui/index.html?language=zh_CN&lang=cn

不过这文档写的比较简略,不太能懂,那么只能自己去找了。

token

所有的接口都需要用到token
在这里插入图片描述
在安全中心-令牌管理 创建一个token 。记住这个token,后面所有的接口都需要用到 。

header

根据上面的token组成请求要用的header

token = ''
headers = {'Accept': 'application/json','token': token
}

项目ID project_id 可以在查看项目工作流时,在url中找到。

DS导入任务接口

导入任务的接口是

import_url = 'http://IP:12345/dolphinscheduler/projects/{project_id}/process-definition/import'

知道接口 就可以导入了

def import_job(file_path):
# 打开文件并读取为二进制数据with open(file_path, 'rb') as file:files = {'file': file}# 导入工作流response = requests.post(import_url, headers=headers, files=files)print(response.status_code)if response.status_code != 200:print('上传失败  '+file_path)

需要注意的是,导入任务时 只支持二进制 。
file_path 是工作流文件,具体实现 可以工作流中导出一个作为参考。
重复使用上述方法,就可以实现批量导入任务。

工作流上线

使用上述方法批量完成任务上传后,依旧有问题,逐个上线工作量也是个不小的工作量,因此继续使用接口。
经过研究发现,上线工作流需要先获取工作流的调度ID 。

获取工作流列表 - > 获取工作流code -> 获取所有工作流的调度ID -> 工作流上线

获取工作流列表

这是接口地址

jobs_url = 'http://IP:12345/dolphinscheduler/projects/{project_id}/process-definition'

不过这个要分页查询,稍微有一点点麻烦

def get_jobs_list():# 分页查询# 初始化分页参数pageNo = 1pageSize = 10url = f'{jobs_url}?pageSize=10&pageNo=1&searchVal='# 构建完整的URL# 存储所有结果all_items = list()while True:# 构建完整的URLurl = f'{jobs_url}?pageSize={pageSize}&pageNo={pageNo}&searchVal='# 发送GET请求response = requests.get(url, headers=headers)# 检查响应状态码if response.status_code == 200:# 请求成功,处理响应数据items = response.content.decode()total = json.loads(items)["data"]["total"]item = json.loads(items)["data"]["totalList"]# 将当前页的数据添加到结果列表中for i in item:all_items.append(i)# 如果当前页没有数据,退出循环if pageNo * pageSize > total:breakif not items:break# 增加页码pageNo += 1else:# 请求失败,打印错误信息print('请求失败:', response.status_code, response.text)breakreturn all_items

all_items 是所有工作流的具体内容,需要提取一下

 all_jobs = get_jobs_list()job_codes = [job['code'] for job in all_jobs]

这样就是所有的工作流code

获取调度ID

下面是调度ID的接口,因为不想分页,直接一页1000个。

schedules_url = 'http://36.133.140.132:12345/dolphinscheduler/projects/{project_id}/schedules?pageSize=1000&pageNo=1&processDefinitionCode='

使用这个接口就能拿到所有的调度ID

def schedule_id(job_code):url = schedules_url+str(job_code)response = requests.get(url, headers=headers)if response.status_code == 200:data = response.content.decode()js = json.loads(data)if len(js['data']['totalList'])>0 and js['data']['totalList'][0]['releaseState']=='OFFLINE':return js['data']['totalList'][0]['id']else:return ''

这里过滤了已经上线的调度ID 。

上线

万事俱备 终于可以上线了

online_url = 'http://36.133.140.132:12345/dolphinscheduler/projects/{project_id}/schedules/{scheduler_id}/online'

具体实现

def online_job(scheduler_id):url = online_url.format(scheduler_id=scheduler_id)response = requests.post(url, headers=headers)if response.status_code == 200:print('success')else:print('online job failed')

到此 就可以实现导入-批量全自动了。

打完收工,祝你不加班。

http://www.lryc.cn/news/468595.html

相关文章:

  • pycharm导出环境安装包列表
  • 分体式智能网关在现代电力物联网中的优势有哪些?
  • 第14篇:下一代网络与新兴技术
  • 物联网数据采集网关详细介绍-天拓四方
  • 2024软考网络工程师笔记 - 第10章.组网技术
  • C语言——字符串指针和字符串数组
  • 7-1回文判断(栈和队列PTA)
  • 使用 NCC 和 PKG 打包 Node.js 项目为可执行文件(Linux ,macOS,Windows)
  • LeetCode:2747. 统计没有收到请求的服务器数目(滑动窗口 Java)
  • 项目管理工具--【项目策划任务书】模板
  • 雷池社区版那么火,为什么站长都使用雷池社区版??
  • 分布式日志有哪些?
  • ETCD未授权访问风险基于角色认证和启用https的ca证书修复方案
  • 执行Django项目的数据库迁移命令时报错:(1050, “Table ‘django_session‘ already exists“);如何破?
  • 问丫:创新社交平台的技术魅力与发展潜力
  • iOS Swift逆向——被编译优化后的函数参数调用约定修复
  • self-supervised learning(BERT和GPT)
  • 基于RBF神经网络的双参数自适应光储VSG构网逆变器MATLAB仿真模型
  • Openpyxl--学习记录
  • 高边坡稳定安全监测预警系统解决方案
  • 计算机毕业设计 | vue+springboot借书管理 图书馆管理系统(附源码)
  • vue3 腾讯地图 InfoWindow 弹框
  • 【Linux】解锁进程间通信奥秘,高效资源共享的实战技巧
  • O1 Nano:OpenAI O1模型系列的简化开源版本
  • 浅谈人工智能之Llama3微调后使用cmmlu评估
  • 为什么需要MQ?MQ具有哪些作用?你用过哪些MQ产品?请结合过往的项目经验谈谈具体是怎么用的?
  • Flutter项目打包ios, Xcode 发布报错 Module‘flutter barcode_scanner‘not found
  • RWSENodeEncoder, KER_DIM_PE(lrgb文件中的encoders文件中的kernel.py)
  • 技术文档:基于微信朋友圈的自动点赞工具开发
  • kubernetes_pods资源清单及常用命令