Skip to content

DAG Generation

Generate production-ready Airflow DAGs and Prefect flows from workflow YAML files.

Overview

The quicketl workflow generate command transforms your local workflow definitions into production orchestration code:

# Generate Airflow DAG
quicketl workflow generate workflows/medallion.yml --target airflow -o dags/medallion_dag.py

# Generate Prefect flow
quicketl workflow generate workflows/medallion.yml --target prefect -o flows/medallion_flow.py

This enables a powerful development workflow:

  1. Develop locally - Test workflows with quicketl workflow run
  2. Validate - Check configuration with quicketl workflow validate
  3. Generate - Create production DAG code
  4. Deploy - Copy generated code to your orchestrator

Airflow DAG Generation

Basic Usage

quicketl workflow generate workflows/medallion.yml --target airflow

Outputs Python code to stdout. Save to a file:

quicketl workflow generate workflows/medallion.yml --target airflow -o dags/medallion_dag.py

Options

Option Description
--target airflow Generate Airflow DAG
--output, -o Output file path
--dag-id Override DAG ID (defaults to workflow name)
--schedule Cron schedule (e.g., "0 0 * * *")

Example

Given this workflow:

# workflows/medallion.yml
name: medallion_etl
description: Bronze -> Silver pipeline

stages:
  - name: bronze
    parallel: true
    pipelines:
      - path: pipelines/bronze/ingest_users.yml
      - path: pipelines/bronze/ingest_events.yml

  - name: silver
    depends_on: [bronze]
    parallel: true
    pipelines:
      - path: pipelines/silver/clean_users.yml
      - path: pipelines/silver/clean_events.yml

Generate DAG:

quicketl workflow generate workflows/medallion.yml --target airflow --schedule "0 0 * * *"

Output:

"""
Airflow DAG: medallion_etl

Bronze -> Silver pipeline

Generated by: quicketl workflow generate --target airflow
"""

from datetime import datetime

from airflow import DAG
from airflow.operators.python import PythonOperator

from quicketl.pipeline import run_pipeline


# =============================================================================
# Pipeline Tasks
# =============================================================================

def run_ingest_users(**context):
    """Run ingest_users pipeline."""
    result = run_pipeline(
        "workflows/../pipelines/bronze/ingest_users.yml",
    )
    if result.failed:
        raise Exception(f"Pipeline ingest_users failed: {result.error}")
    return result.to_dict()


def run_ingest_events(**context):
    """Run ingest_events pipeline."""
    result = run_pipeline(
        "workflows/../pipelines/bronze/ingest_events.yml",
    )
    if result.failed:
        raise Exception(f"Pipeline ingest_events failed: {result.error}")
    return result.to_dict()


def run_clean_users(**context):
    """Run clean_users pipeline."""
    result = run_pipeline(
        "workflows/../pipelines/silver/clean_users.yml",
    )
    if result.failed:
        raise Exception(f"Pipeline clean_users failed: {result.error}")
    return result.to_dict()


def run_clean_events(**context):
    """Run clean_events pipeline."""
    result = run_pipeline(
        "workflows/../pipelines/silver/clean_events.yml",
    )
    if result.failed:
        raise Exception(f"Pipeline clean_events failed: {result.error}")
    return result.to_dict()


# =============================================================================
# DAG Definition
# =============================================================================

default_args = {
    "owner": "airflow",
    "retries": 1,
}

with DAG(
    dag_id="medallion_etl",
    description="Bronze -> Silver pipeline",
    schedule="0 0 * * *",
    start_date=datetime(2025, 1, 1),
    catchup=False,
    default_args=default_args,
    tags=["quicketl"],
) as dag:

    bronze_ingest_users = PythonOperator(
        task_id="bronze_ingest_users",
        python_callable=run_ingest_users,
    )

    bronze_ingest_events = PythonOperator(
        task_id="bronze_ingest_events",
        python_callable=run_ingest_events,
    )

    silver_clean_users = PythonOperator(
        task_id="silver_clean_users",
        python_callable=run_clean_users,
    )

    silver_clean_events = PythonOperator(
        task_id="silver_clean_events",
        python_callable=run_clean_events,
    )

    # Dependencies
    bronze_ingest_users >> silver_clean_users
    bronze_ingest_users >> silver_clean_events
    bronze_ingest_events >> silver_clean_users
    bronze_ingest_events >> silver_clean_events

Customizing the DAG

After generation, you can customize the DAG:

  • Add sensors or external triggers
  • Modify retry logic
  • Add alerting callbacks
  • Integrate with Airflow Variables/Connections

See Airflow Integration for advanced patterns.

Prefect Flow Generation

Basic Usage

quicketl workflow generate workflows/medallion.yml --target prefect

Save to file:

quicketl workflow generate workflows/medallion.yml --target prefect -o flows/medallion_flow.py

Options

Option Description
--target prefect Generate Prefect flow
--output, -o Output file path
--dag-id Override flow name (defaults to workflow name)

Example Output

"""
Prefect Flow: medallion_etl

Bronze -> Silver pipeline

Generated by: quicketl workflow generate --target prefect
"""

from prefect import flow, task
from prefect.futures import wait

from quicketl.pipeline import run_pipeline


# =============================================================================
# Pipeline Tasks
# =============================================================================

@task
def run_ingest_users():
    """Run ingest_users pipeline."""
    result = run_pipeline(
        "workflows/../pipelines/bronze/ingest_users.yml",
    )
    if result.failed:
        raise Exception(f"Pipeline ingest_users failed: {result.error}")
    return result


@task
def run_ingest_events():
    """Run ingest_events pipeline."""
    result = run_pipeline(
        "workflows/../pipelines/bronze/ingest_events.yml",
    )
    if result.failed:
        raise Exception(f"Pipeline ingest_events failed: {result.error}")
    return result


@task
def run_clean_users():
    """Run clean_users pipeline."""
    result = run_pipeline(
        "workflows/../pipelines/silver/clean_users.yml",
    )
    if result.failed:
        raise Exception(f"Pipeline clean_users failed: {result.error}")
    return result


@task
def run_clean_events():
    """Run clean_events pipeline."""
    result = run_pipeline(
        "workflows/../pipelines/silver/clean_events.yml",
    )
    if result.failed:
        raise Exception(f"Pipeline clean_events failed: {result.error}")
    return result


# =============================================================================
# Flow Definition
# =============================================================================

@flow(name="medallion_etl")
def medallion_etl():
    """
    Bronze -> Silver pipeline

    Stages:
      - bronze
      - silver (depends on: bronze)
    """

    # Stage: bronze
    bronze_results = []
    bronze_results.append(run_ingest_users.submit())
    bronze_results.append(run_ingest_events.submit())

    # Wait for bronze stage
    wait(bronze_results)

    # Stage: silver
    silver_results = []
    silver_results.append(run_clean_users.submit())
    silver_results.append(run_clean_events.submit())

    return {
        "bronze": bronze_results,
        "silver": silver_results,
    }


if __name__ == "__main__":
    medallion_etl()

Development Workflow

┌─────────────────┐
│  Develop & Test │
│   Locally       │
└────────┬────────┘
┌─────────────────┐
│  quicketl       │
│  workflow run   │◄──── Test with real data
└────────┬────────┘
┌─────────────────┐
│  quicketl       │
│  workflow       │◄──── Validate config
│  validate       │
└────────┬────────┘
┌─────────────────┐
│  quicketl       │
│  workflow       │◄──── Generate DAG
│  generate       │
└────────┬────────┘
┌─────────────────┐
│  Deploy to      │
│  Airflow/       │◄──── Copy to dags/ folder
│  Prefect        │
└─────────────────┘

CI/CD Integration

# .github/workflows/deploy-dags.yml
name: Deploy DAGs

on:
  push:
    branches: [main]
    paths:
      - 'workflows/**'
      - 'pipelines/**'

jobs:
  deploy:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Install QuickETL
        run: pip install quicketl

      - name: Validate workflows
        run: |
          for f in workflows/*.yml; do
            quicketl workflow validate "$f"
          done

      - name: Generate DAGs
        run: |
          for f in workflows/*.yml; do
            name=$(basename "$f" .yml)
            quicketl workflow generate "$f" --target airflow -o "dags/${name}_dag.py"
          done

      - name: Deploy to Airflow
        run: |
          # Copy to Airflow dags folder or S3/GCS bucket
          aws s3 sync dags/ s3://my-airflow-bucket/dags/

File Path Considerations

Generated DAG code uses paths relative to the workflow file. When deploying:

  1. Keep same directory structure: Ensure pipelines/ folder is accessible from where DAGs run
  2. Use absolute paths: Modify workflow variables to use absolute paths in production
  3. Mount shared storage: In Kubernetes/Docker, mount the same paths

Example with absolute paths:

# workflows/medallion.yml
variables:
  BASE_PATH: /opt/airflow/pipelines

stages:
  - name: bronze
    pipelines:
      - path: ${BASE_PATH}/bronze/ingest_users.yml

Troubleshooting

"Pipeline file not found" in Airflow

The pipeline paths are relative to the workflow file location. Ensure:

  1. Pipeline files are deployed alongside the DAG
  2. Working directory is correct when DAG runs
  3. Use absolute paths in production workflows

"ModuleNotFoundError: quicketl"

Install QuickETL in your Airflow/Prefect environment:

pip install quicketl

Or add to requirements.txt in your Airflow deployment.

Variables not substituted

Variables are resolved at runtime by QuickETL. Ensure:

  1. Environment variables are set in Airflow/Prefect environment
  2. Workflow variables are defined in YAML
  3. Use ${VAR} syntax (not $VAR or {{ var }})