使用Dagster资产工厂模式高效管理重复ETL任务
本文介绍了如何利用Dagster的资产工厂模式来高效管理和自动化重复的ETL(提取、转换、加载)任务。通过Python函数和YAML配置文件的结合,我们可以轻松地创建和管理大量相似的资产,同时提高代码的可维护性和可配置性。文章还探讨了如何使用Pydantic和Jinja进一步提升配置文件的类型安全性和灵活性。
资产工厂模式简介
在数据工程领域,我们经常需要处理大量相似的ETL任务。例如,从S3下载CSV文件,执行SQL查询,然后将结果上传回S3。这种重复性工作不仅耗时,而且容易出错。幸运的是,Dagster提供了资产工厂模式,可以帮助我们优雅地解决这个问题。
资产工厂模式是一种设计模式,它允许我们通过一个函数接收配置参数,并返回一组Dagster资产定义。这种方法非常适合处理大量相似的资产,因为它可以将重复的逻辑封装在一个函数中,从而减少代码冗余和提高可维护性。
Python实现资产工厂
让我们通过一个具体的例子来了解如何使用Python实现资产工厂。假设我们有一个团队需要频繁地从S3下载CSV文件,执行SQL查询,然后将结果上传回S3。我们可以定义一个资产工厂函数build_etl_job
,它接收S3资源配置、桶名、源文件、目标文件和SQL查询作为参数,并返回一组Dagster资产定义。
import tempfileimport dagster_aws.s3 as s3
import duckdbimport dagster as dgdef build_etl_job(s3_resource: s3.S3Resource,bucket: str,source_object: str,target_object: str,sql: str,
) -> dg.Definitions:# asset keys cannot contain '.'asset_key = f"etl_{bucket}_{target_object}".replace(".", "_")@dg.asset(name=asset_key)def etl_asset(context):with tempfile.TemporaryDirectory() as root:source_path = f"{root}/{source_object}"target_path = f"{root}/{target_object}"# these steps could be split into separate assets, but# for brevity we will keep them together.# 1. extractcontext.resources.s3.download_file(bucket, source_object, source_path)# 2. transformdb = duckdb.connect(":memory:")db.execute(f"CREATE TABLE source AS SELECT * FROM read_csv('{source_path}');")db.query(sql).to_csv(target_path)# 3. loadcontext.resources.s3.upload_file(bucket, target_object, target_path)return dg.Definitions(assets=[etl_asset],resources={"s3": s3_resource},)s3_resource = s3.S3Resource(aws_access_key_id="...", aws_secret_access_key="...")defs = dg.Definitions.merge(build_etl_job(s3_resource=s3_resource,bucket="my_bucket",source_object="raw_transactions.csv",target_object="cleaned_transactions.csv",sql="SELECT * FROM source WHERE amount IS NOT NULL;",),build_etl_job(s3_resource=s3_resource,bucket="my_bucket",source_object="all_customers.csv",target_object="risky_customers.csv",sql="SELECT * FROM source WHERE risk_score > 0.8;",),
)
通过这种方式,我们可以轻松地创建和管理多个相似的ETL任务,只需调用build_etl_job
函数并传入不同的配置参数即可。
使用YAML配置资产工厂
虽然Python实现已经大大简化了代码,但我们还可以进一步改进,使配置更加灵活和易于管理。我们可以使用YAML文件来定义ETL任务的配置,并通过Python代码解析YAML文件并创建资产定义。
etl_jobs.yaml
aws:access_key_id: "YOUR_ACCESS_KEY_ID"secret_access_key: "YOUR_SECRET_ACCESS_KEY"
etl_jobs:- bucket: my_bucketsource: raw_transactions.csvtarget: cleaned_transactions.csvsql: SELECT * FROM source WHERE amount IS NOT NULL- bucket: my_bucketsource: all_customers.csvtarget: risky_customers.csvsql: SELECT * FROM source WHERE risk_score > 0.8
然后,我们可以编写一个Python函数load_etl_jobs_from_yaml
来解析YAML文件并创建资产定义。
import dagster_aws.s3 as s3
import yamlimport dagster as dgdef build_etl_job(s3_resource: s3.S3Resource,bucket: str,source_object: str,target_object: str,sql: str,
) -> dg.Definitions:# Code from previous example omittedreturn dg.Definitions()def load_etl_jobs_from_yaml(yaml_path: str) -> dg.Definitions:config = yaml.safe_load(open(yaml_path))s3_resource = s3.S3Resource(aws_access_key_id=config["aws"]["access_key_id"],aws_secret_access_key=config["aws"]["secret_access_key"],)defs = []for job_config in config["etl_jobs"]:defs.append(build_etl_job(s3_resource=s3_resource,bucket=job_config["bucket"],source_object=job_config["source"],target_object=job_config["target"],sql=job_config["sql"],))return dg.Definitions.merge(*defs)defs = load_etl_jobs_from_yaml("etl_jobs.yaml")
用Pydantic 和 Jinja提升配置文件的类型安全性和灵活性
虽然YAML配置文件已经大大简化了配置过程,但它仍然存在一些问题,例如缺乏类型检查和安全性。为了解决这些问题,我们可以使用Pydantic来定义配置文件的schema,并使用Jinja来模板化配置文件中的环境变量。
首先定义etl_jobs.yaml文件:
aws:access_key_id: "{{ env.AWS_ACCESS_KEY_ID }}"secret_access_key: "{{ env.AWS_SECRET_ACCESS_KEY }}"etl_jobs:- bucket: my_bucketsource: raw_transactions.csvtarget: cleaned_transactions.csvsql: SELECT * FROM source WHERE amount IS NOT NULL- bucket: my_bucketsource: all_customers.csvtarget: risky_customers.csvsql: SELECT * FROM source WHERE risk_score > 0.8
python实现代码:
import os
from typing import Listimport dagster_aws.s3 as s3
import jinja2
import pydantic
import yamlimport dagster as dgdef build_etl_job(s3_resource: s3.S3Resource,bucket: str,source_object: str,target_object: str,sql: str,
) -> dg.Definitions:# Code from previous example omittedreturn dg.Definitions()class AwsConfig(pydantic.BaseModel):access_key_id: strsecret_access_key: strdef to_resource(self) -> s3.S3Resource:return s3.S3Resource(aws_access_key_id=self.access_key_id,aws_secret_access_key=self.secret_access_key,)class JobConfig(pydantic.BaseModel):bucket: strsource: strtarget: strsql: strdef to_etl_job(self, s3_resource: s3.S3Resource) -> dg.Definitions:return build_etl_job(s3_resource=s3_resource,bucket=self.bucket,source_object=self.source,target_object=self.target,sql=self.sql,)class EtlJobsConfig(pydantic.BaseModel):aws: AwsConfigetl_jobs: list[JobConfig]def to_definitions(self) -> dg.Definitions:s3_resource = self.aws.to_resource()return dg.Definitions.merge(*[job.to_etl_job(s3_resource) for job in self.etl_jobs])def load_etl_jobs_from_yaml(yaml_path: str) -> dg.Definitions:yaml_template = jinja2.Environment().from_string(open(yaml_path).read())config = yaml.safe_load(yaml_template.render(env=os.environ))return EtlJobsConfig.model_validate(config).to_definitions()defs = load_etl_jobs_from_yaml("etl_jobs_with_jinja.yaml")
通过这种方式,我们可以确保配置文件的类型安全性,并且可以轻松地引用环境变量,从而避免在配置文件中硬编码敏感信息。
总结
通过本文的介绍,我们了解了如何使用Dagster的资产工厂模式来高效管理和自动化重复的ETL任务。从Python实现到YAML配置,再到使用Pydantic和Jinja提升配置文件的类型安全性和灵活性,我们一步步地构建了一个强大而灵活的ETL任务管理框架。这种方法不仅可以减少代码冗余,提高可维护性,还可以使配置更加灵活和安全。希望本文能为你在数据工程领域的实践提供有价值的启发和帮助。