当前位置: 首页 > article >正文

dagster的etl实现

本文展示了如何使用Dagster框架实现一个动态ETL(Extract, Transform, Load)流程。通过定义多个操作(op),包括生成动态任务、处理单个任务、收集结果和汇总结果,构建了一个动态任务处理流程。generate_tasks操作生成多个动态任务,process_task对每个任务进行处理,collect_results收集所有处理结果,summarize_results汇总结果并生成资产。最后,通过Definitions将流程定义为可执行的作业(job),并提供了直接运行流程的示例代码

注意dagster版本

dagster, version 1.10.14

"""
实现dagster的etl实践案例。
"""from dagster import op, graph, job
from dagster import DynamicOut, DynamicOutput, AssetMaterialization, Out, Output
from dagster import Definitions
from typing import List@op(out=DynamicOut(int))
def generate_tasks(context):"""生成动态任务,每个任务对应一个整数值"""context.log.info("开始生成动态任务...")for i in range(3):yield DynamicOutput(value=i, mapping_key=f"task_{i}")@op
def process_task(context, num: int) -> int:"""处理单个任务,将输入值乘以2"""context.log.info(f"处理任务 {num}")return num * 2@op(out=Out(List[int]))
def collect_results(context, results: List[int]) -> List[int]:"""收集所有处理结果并返回列表"""context.log.info(f"收集到 {len(results)} 个处理结果")context.log.info(f"数据细节:{results}")return results@op(out = Out(int))
def summarize_results(context, results: list):"""汇总处理结果,计算总和"""total = sum(results)total = int(total)context.log.info(f"所有任务处理完成,总和为: {total}")yield AssetMaterialization(asset_key="final_result",description="所有任务处理结果的总和",metadata={"total": total})yield Output(total, output_name="result")@graph
def dynamic_pipeline():"""定义动态任务处理流程"""results = generate_tasks().map(process_task)collected = collect_results(results.collect())summarize_results(collected)# 将graph转换为可执行的job
@job
def run_dynamic_pipeline():dynamic_pipeline()# 定义可执行实体
defs = Definitions(jobs=[run_dynamic_pipeline]
)# 示例:直接运行pipeline(用于测试)
if __name__ == "__main__":result = run_dynamic_pipeline.execute_in_process()print("执行结果:", result.success)
http://www.lryc.cn/news/2378356.html

相关文章:

  • python的漫画网站管理系统
  • 源码安装gperftools工具
  • QMK 宏(Macros)功能详解(实战部分)
  • 前端脚手架开发指南:提高开发效率的核心操作
  • 搜索引擎工作原理|倒排索引|query改写|CTR点击率预估|爬虫
  • Python实例题:Python自动工资条
  • Function Calling万字实战指南:打造高智能数据分析Agent平台
  • spark MySQL数据库配置
  • python四则运算计算器
  • 线对板连接器的兼容性问题:为何老旧设计难以满足现代需求?
  • AI517 AI本地部署 docker微调(失败)
  • VR和眼动控制集群机器人的方法
  • python训练营打卡第26天
  • TiDB 中新 Hash Join 的设计与性能优化
  • 1.共享内存(python共享内存实际案例,传输opencv frame)
  • 网页常见水印实现方式
  • oracle主备切换参考
  • Java大师成长计划之第25天:Spring生态与微服务架构之容错与断路器模式
  • 【ARM】MDK如何将变量存储到指定内存地址
  • Unity3D仿星露谷物语开发44之收集农作物
  • langchain—chatchat
  • 经典算法 求C(N, K) % mod,保证mod是质数
  • 【LeetCode 热题 100】二叉树的最大深度 / 翻转二叉树 / 二叉树的直径 / 验证二叉搜索树
  • 关于软件测试开发的一些有趣的知识
  • uni-app 开发HarmonyOS的鸿蒙影视项目分享:从实战案例到开源后台
  • 售前工作.工作流程和工具
  • GPU与NPU异构计算任务划分算法研究:基于强化学习的Transformer负载均衡实践
  • 学习ai课程大纲
  • 基于CentOS7制作OpenSSL 1.1的RPM包
  • 数据分析_Python