DAG Generation¶
Generate production-ready Airflow DAGs and Prefect flows from workflow YAML files.
Overview¶
The quicketl workflow generate command transforms your local workflow definitions into production orchestration code:
# Generate Airflow DAG
quicketl workflow generate workflows/medallion.yml --target airflow -o dags/medallion_dag.py
# Generate Prefect flow
quicketl workflow generate workflows/medallion.yml --target prefect -o flows/medallion_flow.py
This enables a powerful development workflow:
- Develop locally - Test workflows with
quicketl workflow run - Validate - Check configuration with
quicketl workflow validate - Generate - Create production DAG code
- Deploy - Copy generated code to your orchestrator
Airflow DAG Generation¶
Basic Usage¶
Outputs Python code to stdout. Save to a file:
Options¶
| Option | Description |
|---|---|
--target airflow |
Generate Airflow DAG |
--output, -o |
Output file path |
--dag-id |
Override DAG ID (defaults to workflow name) |
--schedule |
Cron schedule (e.g., "0 0 * * *") |
Example¶
Given this workflow:
# workflows/medallion.yml
name: medallion_etl
description: Bronze -> Silver pipeline
stages:
- name: bronze
parallel: true
pipelines:
- path: pipelines/bronze/ingest_users.yml
- path: pipelines/bronze/ingest_events.yml
- name: silver
depends_on: [bronze]
parallel: true
pipelines:
- path: pipelines/silver/clean_users.yml
- path: pipelines/silver/clean_events.yml
Generate DAG:
Output:
"""
Airflow DAG: medallion_etl
Bronze -> Silver pipeline
Generated by: quicketl workflow generate --target airflow
"""
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from quicketl.pipeline import run_pipeline
# =============================================================================
# Pipeline Tasks
# =============================================================================
def run_ingest_users(**context):
"""Run ingest_users pipeline."""
result = run_pipeline(
"workflows/../pipelines/bronze/ingest_users.yml",
)
if result.failed:
raise Exception(f"Pipeline ingest_users failed: {result.error}")
return result.to_dict()
def run_ingest_events(**context):
"""Run ingest_events pipeline."""
result = run_pipeline(
"workflows/../pipelines/bronze/ingest_events.yml",
)
if result.failed:
raise Exception(f"Pipeline ingest_events failed: {result.error}")
return result.to_dict()
def run_clean_users(**context):
"""Run clean_users pipeline."""
result = run_pipeline(
"workflows/../pipelines/silver/clean_users.yml",
)
if result.failed:
raise Exception(f"Pipeline clean_users failed: {result.error}")
return result.to_dict()
def run_clean_events(**context):
"""Run clean_events pipeline."""
result = run_pipeline(
"workflows/../pipelines/silver/clean_events.yml",
)
if result.failed:
raise Exception(f"Pipeline clean_events failed: {result.error}")
return result.to_dict()
# =============================================================================
# DAG Definition
# =============================================================================
default_args = {
"owner": "airflow",
"retries": 1,
}
with DAG(
dag_id="medallion_etl",
description="Bronze -> Silver pipeline",
schedule="0 0 * * *",
start_date=datetime(2025, 1, 1),
catchup=False,
default_args=default_args,
tags=["quicketl"],
) as dag:
bronze_ingest_users = PythonOperator(
task_id="bronze_ingest_users",
python_callable=run_ingest_users,
)
bronze_ingest_events = PythonOperator(
task_id="bronze_ingest_events",
python_callable=run_ingest_events,
)
silver_clean_users = PythonOperator(
task_id="silver_clean_users",
python_callable=run_clean_users,
)
silver_clean_events = PythonOperator(
task_id="silver_clean_events",
python_callable=run_clean_events,
)
# Dependencies
bronze_ingest_users >> silver_clean_users
bronze_ingest_users >> silver_clean_events
bronze_ingest_events >> silver_clean_users
bronze_ingest_events >> silver_clean_events
Customizing the DAG¶
After generation, you can customize the DAG:
- Add sensors or external triggers
- Modify retry logic
- Add alerting callbacks
- Integrate with Airflow Variables/Connections
See Airflow Integration for advanced patterns.
Prefect Flow Generation¶
Basic Usage¶
Save to file:
Options¶
| Option | Description |
|---|---|
--target prefect |
Generate Prefect flow |
--output, -o |
Output file path |
--dag-id |
Override flow name (defaults to workflow name) |
Example Output¶
"""
Prefect Flow: medallion_etl
Bronze -> Silver pipeline
Generated by: quicketl workflow generate --target prefect
"""
from prefect import flow, task
from prefect.futures import wait
from quicketl.pipeline import run_pipeline
# =============================================================================
# Pipeline Tasks
# =============================================================================
@task
def run_ingest_users():
"""Run ingest_users pipeline."""
result = run_pipeline(
"workflows/../pipelines/bronze/ingest_users.yml",
)
if result.failed:
raise Exception(f"Pipeline ingest_users failed: {result.error}")
return result
@task
def run_ingest_events():
"""Run ingest_events pipeline."""
result = run_pipeline(
"workflows/../pipelines/bronze/ingest_events.yml",
)
if result.failed:
raise Exception(f"Pipeline ingest_events failed: {result.error}")
return result
@task
def run_clean_users():
"""Run clean_users pipeline."""
result = run_pipeline(
"workflows/../pipelines/silver/clean_users.yml",
)
if result.failed:
raise Exception(f"Pipeline clean_users failed: {result.error}")
return result
@task
def run_clean_events():
"""Run clean_events pipeline."""
result = run_pipeline(
"workflows/../pipelines/silver/clean_events.yml",
)
if result.failed:
raise Exception(f"Pipeline clean_events failed: {result.error}")
return result
# =============================================================================
# Flow Definition
# =============================================================================
@flow(name="medallion_etl")
def medallion_etl():
"""
Bronze -> Silver pipeline
Stages:
- bronze
- silver (depends on: bronze)
"""
# Stage: bronze
bronze_results = []
bronze_results.append(run_ingest_users.submit())
bronze_results.append(run_ingest_events.submit())
# Wait for bronze stage
wait(bronze_results)
# Stage: silver
silver_results = []
silver_results.append(run_clean_users.submit())
silver_results.append(run_clean_events.submit())
return {
"bronze": bronze_results,
"silver": silver_results,
}
if __name__ == "__main__":
medallion_etl()
Development Workflow¶
Recommended Pattern¶
┌─────────────────┐
│ Develop & Test │
│ Locally │
└────────┬────────┘
│
▼
┌─────────────────┐
│ quicketl │
│ workflow run │◄──── Test with real data
└────────┬────────┘
│
▼
┌─────────────────┐
│ quicketl │
│ workflow │◄──── Validate config
│ validate │
└────────┬────────┘
│
▼
┌─────────────────┐
│ quicketl │
│ workflow │◄──── Generate DAG
│ generate │
└────────┬────────┘
│
▼
┌─────────────────┐
│ Deploy to │
│ Airflow/ │◄──── Copy to dags/ folder
│ Prefect │
└─────────────────┘
CI/CD Integration¶
# .github/workflows/deploy-dags.yml
name: Deploy DAGs
on:
push:
branches: [main]
paths:
- 'workflows/**'
- 'pipelines/**'
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install QuickETL
run: pip install quicketl
- name: Validate workflows
run: |
for f in workflows/*.yml; do
quicketl workflow validate "$f"
done
- name: Generate DAGs
run: |
for f in workflows/*.yml; do
name=$(basename "$f" .yml)
quicketl workflow generate "$f" --target airflow -o "dags/${name}_dag.py"
done
- name: Deploy to Airflow
run: |
# Copy to Airflow dags folder or S3/GCS bucket
aws s3 sync dags/ s3://my-airflow-bucket/dags/
File Path Considerations¶
Generated DAG code uses paths relative to the workflow file. When deploying:
- Keep same directory structure: Ensure
pipelines/folder is accessible from where DAGs run - Use absolute paths: Modify workflow variables to use absolute paths in production
- Mount shared storage: In Kubernetes/Docker, mount the same paths
Example with absolute paths:
# workflows/medallion.yml
variables:
BASE_PATH: /opt/airflow/pipelines
stages:
- name: bronze
pipelines:
- path: ${BASE_PATH}/bronze/ingest_users.yml
Troubleshooting¶
"Pipeline file not found" in Airflow¶
The pipeline paths are relative to the workflow file location. Ensure:
- Pipeline files are deployed alongside the DAG
- Working directory is correct when DAG runs
- Use absolute paths in production workflows
"ModuleNotFoundError: quicketl"¶
Install QuickETL in your Airflow/Prefect environment:
Or add to requirements.txt in your Airflow deployment.
Variables not substituted¶
Variables are resolved at runtime by QuickETL. Ensure:
- Environment variables are set in Airflow/Prefect environment
- Workflow variables are defined in YAML
- Use
${VAR}syntax (not$VARor{{ var }})