Skip to content

Airflow Integration

QuickETL provides first-class integration with Apache Airflow for orchestrating data pipelines. Run QuickETL pipelines as Airflow tasks with proper dependency management, retries, and monitoring.

Installation

pip install quicketl[airflow]
# or
uv add quicketl[airflow]

Quick Start

Using the Decorator

The simplest way to run QuickETL in Airflow:

from airflow.decorators import dag
from datetime import datetime
from quicketl.integrations.airflow import quicketl_task

@dag(
    schedule="@daily",
    start_date=datetime(2025, 1, 1),
    catchup=False
)
def sales_pipeline():

    @quicketl_task(config="pipelines/daily_sales.yml")
    def process_sales(**context):
        # Return variables to pass to pipeline
        return {
            "DATE": context["ds"],
            "REGION": "all"
        }

    process_sales()

sales_pipeline()

Using the Operator

For more control, use the operator directly:

from airflow import DAG
from datetime import datetime
from quicketl.integrations.airflow import QuickETLOperator

with DAG(
    "sales_etl",
    schedule="@daily",
    start_date=datetime(2025, 1, 1),
    catchup=False
) as dag:

    process_sales = QuickETLOperator(
        task_id="process_sales",
        config_path="pipelines/daily_sales.yml",
        variables={
            "DATE": "{{ ds }}",
            "REGION": "{{ var.value.region }}"
        },
        engine="duckdb",
        fail_on_checks=True
    )

QuickETLOperator Reference

Parameters

Parameter Type Default Description
config_path str Required Path to YAML config
variables dict None Variables to pass (supports Jinja)
engine str None Override engine
fail_on_checks bool True Fail task on check failure
dry_run bool False Execute without writing

Jinja Templating

Variables support Airflow's Jinja templating:

QuickETLOperator(
    task_id="process",
    config_path="pipelines/sales.yml",
    variables={
        "DATE": "{{ ds }}",
        "EXECUTION_DATE": "{{ execution_date }}",
        "PREV_DATE": "{{ prev_ds }}",
        "NEXT_DATE": "{{ next_ds }}",
        "RUN_ID": "{{ run_id }}",
        "DAG_ID": "{{ dag.dag_id }}",
        "CUSTOM_VAR": "{{ var.value.my_variable }}",
        "CONNECTION": "{{ conn.my_connection.host }}"
    }
)

@quicketl_task Decorator

Parameters

Parameter Type Default Description
config str Required Path to YAML config
engine str None Override engine
fail_on_checks bool True Fail on check failure

Return Values

The decorated function should return a dictionary of variables:

@quicketl_task(config="pipelines/sales.yml")
def process_sales(**context):
    return {
        "DATE": context["ds"],
        "BATCH_ID": context["run_id"]
    }

XCom Integration

Results are automatically pushed to XCom:

@dag(schedule="@daily", start_date=datetime(2025, 1, 1))
def pipeline():

    @quicketl_task(config="pipelines/extract.yml")
    def extract(**context):
        return {"DATE": context["ds"]}

    @task
    def log_results(result):
        print(f"Rows processed: {result['rows_processed']}")
        print(f"Duration: {result['duration_ms']}ms")

    result = extract()
    log_results(result)

DAG Patterns

Sequential Pipeline

from airflow.decorators import dag, task
from quicketl.integrations.airflow import quicketl_task

@dag(schedule="@daily", start_date=datetime(2025, 1, 1))
def sequential_pipeline():

    @quicketl_task(config="pipelines/extract.yml")
    def extract(**context):
        return {"DATE": context["ds"]}

    @quicketl_task(config="pipelines/transform.yml")
    def transform(**context):
        return {"DATE": context["ds"]}

    @quicketl_task(config="pipelines/load.yml")
    def load(**context):
        return {"DATE": context["ds"]}

    extract() >> transform() >> load()

Parallel Processing

@dag(schedule="@daily", start_date=datetime(2025, 1, 1))
def parallel_pipeline():

    @quicketl_task(config="pipelines/sales.yml")
    def process_sales(**context):
        return {"DATE": context["ds"]}

    @quicketl_task(config="pipelines/inventory.yml")
    def process_inventory(**context):
        return {"DATE": context["ds"]}

    @quicketl_task(config="pipelines/aggregate.yml")
    def aggregate(**context):
        return {"DATE": context["ds"]}

    # Parallel tasks feed into aggregate
    [process_sales(), process_inventory()] >> aggregate()

Dynamic Task Mapping

@dag(schedule="@daily", start_date=datetime(2025, 1, 1))
def dynamic_pipeline():

    @task
    def get_regions():
        return ["north", "south", "east", "west"]

    @quicketl_task(config="pipelines/regional_sales.yml")
    def process_region(region, **context):
        return {
            "DATE": context["ds"],
            "REGION": region
        }

    regions = get_regions()
    process_region.expand(region=regions)

Conditional Execution

from airflow.operators.python import BranchPythonOperator

@dag(schedule="@daily", start_date=datetime(2025, 1, 1))
def conditional_pipeline():

    def choose_path(**context):
        day = context["execution_date"].weekday()
        if day == 0:  # Monday
            return "full_refresh"
        return "incremental"

    branch = BranchPythonOperator(
        task_id="choose_path",
        python_callable=choose_path
    )

    @quicketl_task(config="pipelines/full_refresh.yml")
    def full_refresh(**context):
        return {"DATE": context["ds"]}

    @quicketl_task(config="pipelines/incremental.yml")
    def incremental(**context):
        return {"DATE": context["ds"]}

    branch >> [full_refresh(), incremental()]

Error Handling

Retry Configuration

from airflow.decorators import dag
from datetime import timedelta

@dag(
    schedule="@daily",
    start_date=datetime(2025, 1, 1),
    default_args={
        "retries": 3,
        "retry_delay": timedelta(minutes=5),
        "retry_exponential_backoff": True,
        "max_retry_delay": timedelta(hours=1)
    }
)
def pipeline_with_retries():
    @quicketl_task(config="pipelines/sales.yml")
    def process(**context):
        return {"DATE": context["ds"]}

    process()

Custom Error Handling

from airflow.decorators import dag, task
from quicketl import Pipeline
from quicketl.exceptions import QualityCheckError

@dag(schedule="@daily", start_date=datetime(2025, 1, 1))
def pipeline_with_error_handling():

    @task
    def run_pipeline(**context):
        pipeline = Pipeline.from_yaml("pipelines/sales.yml")

        try:
            result = pipeline.run(
                variables={"DATE": context["ds"]},
                fail_on_checks=True
            )
            return result.to_dict()

        except QualityCheckError as e:
            # Log warning but don't fail
            print(f"Quality checks failed: {e}")
            return {"status": "WARNING", "checks_failed": len(e.failed_checks)}

    run_pipeline()

Alerting on Failure

from airflow.decorators import dag
from airflow.operators.email import EmailOperator

@dag(schedule="@daily", start_date=datetime(2025, 1, 1))
def pipeline_with_alerts():

    @quicketl_task(config="pipelines/sales.yml")
    def process(**context):
        return {"DATE": context["ds"]}

    alert = EmailOperator(
        task_id="send_alert",
        to="team@example.com",
        subject="Pipeline Failed: {{ dag.dag_id }}",
        html_content="Task failed at {{ ts }}",
        trigger_rule="one_failed"
    )

    process() >> alert

Best Practices

1. Use Variables for Configuration

# airflow variables
# quicketl_database_url = postgresql://...
# quicketl_s3_bucket = my-bucket

@quicketl_task(config="pipelines/sales.yml")
def process(**context):
    from airflow.models import Variable
    return {
        "DATABASE_URL": Variable.get("quicketl_database_url"),
        "S3_BUCKET": Variable.get("quicketl_s3_bucket"),
        "DATE": context["ds"]
    }

2. Use Connections for Secrets

@task
def run_with_connection(**context):
    from airflow.hooks.base import BaseHook
    from quicketl import Pipeline

    conn = BaseHook.get_connection("my_database")

    import os
    os.environ["POSTGRES_HOST"] = conn.host
    os.environ["POSTGRES_USER"] = conn.login
    os.environ["POSTGRES_PASSWORD"] = conn.password
    os.environ["POSTGRES_DATABASE"] = conn.schema

    pipeline = Pipeline.from_yaml("pipelines/sales.yml")
    return pipeline.run(variables={"DATE": context["ds"]}).to_dict()

3. Organize Pipeline Files

dags/
├── sales_dag.py
├── inventory_dag.py
└── pipelines/
    ├── sales/
    │   ├── extract.yml
    │   ├── transform.yml
    │   └── load.yml
    └── inventory/
        ├── daily.yml
        └── weekly.yml

4. Use SLAs

@dag(
    schedule="@daily",
    start_date=datetime(2025, 1, 1),
    sla_miss_callback=sla_alert
)
def pipeline_with_sla():

    @quicketl_task(
        config="pipelines/sales.yml",
        sla=timedelta(hours=2)
    )
    def process(**context):
        return {"DATE": context["ds"]}

Monitoring

Task Metrics

Access metrics from XCom:

@dag(schedule="@daily", start_date=datetime(2025, 1, 1))
def monitored_pipeline():

    @quicketl_task(config="pipelines/sales.yml")
    def process(**context):
        return {"DATE": context["ds"]}

    @task
    def log_metrics(result):
        # Send to monitoring system
        print(f"Pipeline: {result['pipeline_name']}")
        print(f"Duration: {result['duration_ms']}ms")
        print(f"Rows: {result['rows_processed']}{result['rows_written']}")
        print(f"Checks: {result['checks_passed']}/{result['checks_passed'] + result['checks_failed']}")

    result = process()
    log_metrics(result)

Custom Callbacks

def on_success(context):
    result = context["task_instance"].xcom_pull()
    # Send to monitoring
    print(f"Success: {result['rows_written']} rows")

def on_failure(context):
    # Send alert
    print(f"Failed: {context['exception']}")

@quicketl_task(
    config="pipelines/sales.yml",
    on_success_callback=on_success,
    on_failure_callback=on_failure
)
def process(**context):
    return {"DATE": context["ds"]}