当前位置: 首页 > news >正文

使用Dagster资产工厂模式高效管理重复ETL任务

本文介绍了如何利用Dagster的资产工厂模式来高效管理和自动化重复的ETL(提取、转换、加载)任务。通过Python函数和YAML配置文件的结合,我们可以轻松地创建和管理大量相似的资产,同时提高代码的可维护性和可配置性。文章还探讨了如何使用Pydantic和Jinja进一步提升配置文件的类型安全性和灵活性。

资产工厂模式简介

在数据工程领域,我们经常需要处理大量相似的ETL任务。例如,从S3下载CSV文件,执行SQL查询,然后将结果上传回S3。这种重复性工作不仅耗时,而且容易出错。幸运的是,Dagster提供了资产工厂模式,可以帮助我们优雅地解决这个问题。

资产工厂模式是一种设计模式,它允许我们通过一个函数接收配置参数,并返回一组Dagster资产定义。这种方法非常适合处理大量相似的资产,因为它可以将重复的逻辑封装在一个函数中,从而减少代码冗余和提高可维护性。

在这里插入图片描述

Python实现资产工厂

让我们通过一个具体的例子来了解如何使用Python实现资产工厂。假设我们有一个团队需要频繁地从S3下载CSV文件,执行SQL查询,然后将结果上传回S3。我们可以定义一个资产工厂函数build_etl_job,它接收S3资源配置、桶名、源文件、目标文件和SQL查询作为参数,并返回一组Dagster资产定义。

import tempfileimport dagster_aws.s3 as s3
import duckdbimport dagster as dgdef build_etl_job(s3_resource: s3.S3Resource,bucket: str,source_object: str,target_object: str,sql: str,
) -> dg.Definitions:# asset keys cannot contain '.'asset_key = f"etl_{bucket}_{target_object}".replace(".", "_")@dg.asset(name=asset_key)def etl_asset(context):with tempfile.TemporaryDirectory() as root:source_path = f"{root}/{source_object}"target_path = f"{root}/{target_object}"# these steps could be split into separate assets, but# for brevity we will keep them together.# 1. extractcontext.resources.s3.download_file(bucket, source_object, source_path)# 2. transformdb = duckdb.connect(":memory:")db.execute(f"CREATE TABLE source AS SELECT * FROM read_csv('{source_path}');")db.query(sql).to_csv(target_path)# 3. loadcontext.resources.s3.upload_file(bucket, target_object, target_path)return dg.Definitions(assets=[etl_asset],resources={"s3": s3_resource},)s3_resource = s3.S3Resource(aws_access_key_id="...", aws_secret_access_key="...")defs = dg.Definitions.merge(build_etl_job(s3_resource=s3_resource,bucket="my_bucket",source_object="raw_transactions.csv",target_object="cleaned_transactions.csv",sql="SELECT * FROM source WHERE amount IS NOT NULL;",),build_etl_job(s3_resource=s3_resource,bucket="my_bucket",source_object="all_customers.csv",target_object="risky_customers.csv",sql="SELECT * FROM source WHERE risk_score > 0.8;",),
)

通过这种方式,我们可以轻松地创建和管理多个相似的ETL任务,只需调用build_etl_job函数并传入不同的配置参数即可。

使用YAML配置资产工厂

虽然Python实现已经大大简化了代码,但我们还可以进一步改进,使配置更加灵活和易于管理。我们可以使用YAML文件来定义ETL任务的配置,并通过Python代码解析YAML文件并创建资产定义。

etl_jobs.yaml
aws:access_key_id: "YOUR_ACCESS_KEY_ID"secret_access_key: "YOUR_SECRET_ACCESS_KEY"
etl_jobs:- bucket: my_bucketsource: raw_transactions.csvtarget: cleaned_transactions.csvsql: SELECT * FROM source WHERE amount IS NOT NULL- bucket: my_bucketsource: all_customers.csvtarget: risky_customers.csvsql: SELECT * FROM source WHERE risk_score > 0.8

然后,我们可以编写一个Python函数load_etl_jobs_from_yaml来解析YAML文件并创建资产定义。

import dagster_aws.s3 as s3
import yamlimport dagster as dgdef build_etl_job(s3_resource: s3.S3Resource,bucket: str,source_object: str,target_object: str,sql: str,
) -> dg.Definitions:# Code from previous example omittedreturn dg.Definitions()def load_etl_jobs_from_yaml(yaml_path: str) -> dg.Definitions:config = yaml.safe_load(open(yaml_path))s3_resource = s3.S3Resource(aws_access_key_id=config["aws"]["access_key_id"],aws_secret_access_key=config["aws"]["secret_access_key"],)defs = []for job_config in config["etl_jobs"]:defs.append(build_etl_job(s3_resource=s3_resource,bucket=job_config["bucket"],source_object=job_config["source"],target_object=job_config["target"],sql=job_config["sql"],))return dg.Definitions.merge(*defs)defs = load_etl_jobs_from_yaml("etl_jobs.yaml")

用Pydantic 和 Jinja提升配置文件的类型安全性和灵活性

虽然YAML配置文件已经大大简化了配置过程,但它仍然存在一些问题,例如缺乏类型检查和安全性。为了解决这些问题,我们可以使用Pydantic来定义配置文件的schema,并使用Jinja来模板化配置文件中的环境变量。

首先定义etl_jobs.yaml文件:

aws:access_key_id: "{{ env.AWS_ACCESS_KEY_ID }}"secret_access_key: "{{ env.AWS_SECRET_ACCESS_KEY }}"etl_jobs:- bucket: my_bucketsource: raw_transactions.csvtarget: cleaned_transactions.csvsql: SELECT * FROM source WHERE amount IS NOT NULL- bucket: my_bucketsource: all_customers.csvtarget: risky_customers.csvsql: SELECT * FROM source WHERE risk_score > 0.8

python实现代码:

import os
from typing import Listimport dagster_aws.s3 as s3
import jinja2
import pydantic
import yamlimport dagster as dgdef build_etl_job(s3_resource: s3.S3Resource,bucket: str,source_object: str,target_object: str,sql: str,
) -> dg.Definitions:# Code from previous example omittedreturn dg.Definitions()class AwsConfig(pydantic.BaseModel):access_key_id: strsecret_access_key: strdef to_resource(self) -> s3.S3Resource:return s3.S3Resource(aws_access_key_id=self.access_key_id,aws_secret_access_key=self.secret_access_key,)class JobConfig(pydantic.BaseModel):bucket: strsource: strtarget: strsql: strdef to_etl_job(self, s3_resource: s3.S3Resource) -> dg.Definitions:return build_etl_job(s3_resource=s3_resource,bucket=self.bucket,source_object=self.source,target_object=self.target,sql=self.sql,)class EtlJobsConfig(pydantic.BaseModel):aws: AwsConfigetl_jobs: list[JobConfig]def to_definitions(self) -> dg.Definitions:s3_resource = self.aws.to_resource()return dg.Definitions.merge(*[job.to_etl_job(s3_resource) for job in self.etl_jobs])def load_etl_jobs_from_yaml(yaml_path: str) -> dg.Definitions:yaml_template = jinja2.Environment().from_string(open(yaml_path).read())config = yaml.safe_load(yaml_template.render(env=os.environ))return EtlJobsConfig.model_validate(config).to_definitions()defs = load_etl_jobs_from_yaml("etl_jobs_with_jinja.yaml")

通过这种方式,我们可以确保配置文件的类型安全性,并且可以轻松地引用环境变量,从而避免在配置文件中硬编码敏感信息。

总结

通过本文的介绍,我们了解了如何使用Dagster的资产工厂模式来高效管理和自动化重复的ETL任务。从Python实现到YAML配置,再到使用Pydantic和Jinja提升配置文件的类型安全性和灵活性,我们一步步地构建了一个强大而灵活的ETL任务管理框架。这种方法不仅可以减少代码冗余,提高可维护性,还可以使配置更加灵活和安全。希望本文能为你在数据工程领域的实践提供有价值的启发和帮助。

http://www.lryc.cn/news/572495.html

相关文章:

  • 识别网络延迟与带宽瓶颈
  • M1芯片macOS安装Xinference部署大模型
  • Datawhale 网络爬虫技术入门第2次笔记
  • QT6与VS下实现没有CMD窗口的C++控制台程序
  • 日本生活:日语语言学校-日语作文-沟通无国界(3)-题目:わたしの友達
  • 编程马拉松的定义、运作与发展
  • C语言标准I/O库详解:文件操作与缓冲区机制
  • Qt蓝图式技能编辑器状态机模块设计与实现
  • html实现登录与注册功能案例(不写死且只使用js)
  • 深入解析select模型:FD_SET机制与1024限制的终极指南
  • Linux系统远程操作和程序编译
  • 23.ssr和csr的对比?如何依赖node.js实现
  • [11-5]硬件SPI读写W25Q64 江协科技学习笔记(20个知识点)
  • 嵌入式编译工具链熟悉与游戏移植
  • 基于C#的Baumer相机二次开发教程
  • OpenSSL引擎 + PKCS11 + SoftHSM2认证
  • WHAT - React Native 开发 App 从 0 到上线全流程周期
  • 【嵌入式】鲁班猫玩法大全
  • 第1章: 伯努利模型的极大似然估计与贝叶斯估计
  • 软件工程(期末复习班)
  • 23种设计模式--简单工厂模式理解版
  • Arduino Nano 33 BLE Sense Rev 2开发板使用指南之【外设开发】
  • 零基础指南:利用Cpolar内网穿透实现Synology Drive多端笔记同步
  • Linux基本指令篇 —— mkdir指令
  • MFC中使用CRichEditCtrl控件让文本框中的内容部分加粗
  • 分布变化的模仿学习算法
  • 257. 二叉树的所有路径(js)
  • 【数据治理】要点整理-信息技术服务治理第5部分-数据治理规范-GBT+34960.5-2018
  • C#设计模式之AbstractFactory_抽象工厂_对象创建新模式-练习制作PANL(一)
  • C# winform教程(二)----GroupBox