Airflow DAG Example¶
This example demonstrates a complete Airflow DAG that orchestrates multiple QuickETL pipelines with proper error handling, alerting, and monitoring.
Overview¶
Scenario: Daily data pipeline that:
- Extracts data from multiple sources
- Transforms and validates data
- Loads to data warehouse
- Sends notifications on completion/failure
DAG Structure¶
extract_orders ─┬─► transform_data ─► load_warehouse ─► notify_success
extract_products─┤ │
extract_customers┘ └─► notify_failure (on error)
Complete DAG Code¶
Create dags/daily_etl_dag.py:
"""
Daily ETL Pipeline DAG
This DAG orchestrates the daily ETL process using QuickETL pipelines.
Runs daily at 6 AM UTC.
"""
from datetime import datetime, timedelta
from airflow.decorators import dag, task
from airflow.operators.email import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from quicketl.integrations.airflow import quicketl_task
# Default arguments for all tasks
default_args = {
"owner": "data-team",
"depends_on_past": False,
"email": ["data-alerts@company.com"],
"email_on_failure": True,
"email_on_retry": False,
"retries": 3,
"retry_delay": timedelta(minutes=5),
"retry_exponential_backoff": True,
"max_retry_delay": timedelta(hours=1),
}
@dag(
dag_id="daily_etl_pipeline",
description="Daily ETL pipeline using QuickETL",
schedule="0 6 * * *", # 6 AM UTC daily
start_date=datetime(2025, 1, 1),
catchup=False,
default_args=default_args,
tags=["quicketl", "production", "daily"],
doc_md=__doc__,
)
def daily_etl_pipeline():
"""
Daily ETL pipeline that extracts, transforms, and loads data.
## Pipeline Steps
1. Extract: Pull data from source systems
2. Transform: Clean, validate, and aggregate
3. Load: Write to data warehouse
4. Notify: Send completion status
## Variables Required
- `quicketl_database_url`: Connection string for warehouse
- `quicketl_s3_bucket`: S3 bucket for raw data
## Connections Required
- `snowflake_default`: Snowflake warehouse connection
"""
# =========================================================================
# EXTRACT PHASE
# =========================================================================
@quicketl_task(config="pipelines/extract/orders.yml")
def extract_orders(**context):
"""Extract orders from source system."""
return {
"DATE": context["ds"],
"EXECUTION_DATE": context["execution_date"].isoformat(),
}
@quicketl_task(config="pipelines/extract/products.yml")
def extract_products(**context):
"""Extract product catalog."""
return {"DATE": context["ds"]}
@quicketl_task(config="pipelines/extract/customers.yml")
def extract_customers(**context):
"""Extract customer data."""
return {"DATE": context["ds"]}
# =========================================================================
# TRANSFORM PHASE
# =========================================================================
@quicketl_task(config="pipelines/transform/daily_metrics.yml")
def transform_data(**context):
"""Transform and aggregate extracted data."""
return {
"DATE": context["ds"],
"PREV_DATE": context["prev_ds"],
}
# =========================================================================
# LOAD PHASE
# =========================================================================
@quicketl_task(
config="pipelines/load/warehouse.yml",
fail_on_checks=True,
)
def load_warehouse(**context):
"""Load transformed data to warehouse."""
from airflow.models import Variable
return {
"DATE": context["ds"],
"WAREHOUSE": Variable.get("quicketl_warehouse", default_var="ETL_WH"),
}
# =========================================================================
# NOTIFICATION PHASE
# =========================================================================
@task
def prepare_success_report(**context):
"""Prepare success notification with metrics."""
ti = context["task_instance"]
# Get results from upstream tasks
extract_orders_result = ti.xcom_pull(task_ids="extract_orders")
extract_products_result = ti.xcom_pull(task_ids="extract_products")
extract_customers_result = ti.xcom_pull(task_ids="extract_customers")
transform_result = ti.xcom_pull(task_ids="transform_data")
load_result = ti.xcom_pull(task_ids="load_warehouse")
# Calculate totals
total_rows = sum([
extract_orders_result.get("rows_written", 0),
extract_products_result.get("rows_written", 0),
extract_customers_result.get("rows_written", 0),
])
total_duration = sum([
extract_orders_result.get("duration_ms", 0),
extract_products_result.get("duration_ms", 0),
extract_customers_result.get("duration_ms", 0),
transform_result.get("duration_ms", 0),
load_result.get("duration_ms", 0),
])
return {
"date": context["ds"],
"total_rows_extracted": total_rows,
"rows_loaded": load_result.get("rows_written", 0),
"total_duration_ms": total_duration,
"checks_passed": load_result.get("checks_passed", 0),
}
notify_success = EmailOperator(
task_id="notify_success",
to=["data-team@company.com"],
subject="✓ Daily ETL Success: {{ ds }}",
html_content="""
<h2>Daily ETL Pipeline Completed Successfully</h2>
<p><strong>Date:</strong> {{ ds }}</p>
<p><strong>Run ID:</strong> {{ run_id }}</p>
<h3>Metrics</h3>
<ul>
<li>Rows Extracted: {{ ti.xcom_pull(task_ids='prepare_success_report')['total_rows_extracted'] }}</li>
<li>Rows Loaded: {{ ti.xcom_pull(task_ids='prepare_success_report')['rows_loaded'] }}</li>
<li>Duration: {{ ti.xcom_pull(task_ids='prepare_success_report')['total_duration_ms'] }}ms</li>
<li>Quality Checks Passed: {{ ti.xcom_pull(task_ids='prepare_success_report')['checks_passed'] }}</li>
</ul>
<p><a href="{{ conf.get('webserver', 'base_url') }}/dags/daily_etl_pipeline/grid">View in Airflow</a></p>
""",
)
notify_failure = EmailOperator(
task_id="notify_failure",
to=["data-alerts@company.com", "oncall@company.com"],
subject="✗ Daily ETL FAILED: {{ ds }}",
html_content="""
<h2>Daily ETL Pipeline Failed</h2>
<p><strong>Date:</strong> {{ ds }}</p>
<p><strong>Run ID:</strong> {{ run_id }}</p>
<p>Please investigate immediately.</p>
<p><a href="{{ conf.get('webserver', 'base_url') }}/dags/daily_etl_pipeline/grid">View in Airflow</a></p>
""",
trigger_rule=TriggerRule.ONE_FAILED,
)
# =========================================================================
# DAG DEPENDENCIES
# =========================================================================
# Extract phase (parallel)
orders = extract_orders()
products = extract_products()
customers = extract_customers()
# Transform phase (after all extracts)
transformed = transform_data()
[orders, products, customers] >> transformed
# Load phase
loaded = load_warehouse()
transformed >> loaded
# Notification phase
report = prepare_success_report()
loaded >> report >> notify_success
# Failure notification (triggers if any task fails)
[orders, products, customers, transformed, loaded] >> notify_failure
# Instantiate DAG
daily_etl_pipeline()
Pipeline Configuration Files¶
Extract Orders (pipelines/extract/orders.yml)¶
name: extract_orders
description: Extract orders from source database
engine: duckdb
source:
type: database
connection: postgres
query: |
SELECT *
FROM orders
WHERE order_date = '${DATE}'
sink:
type: file
path: staging/orders/date=${DATE}/orders.parquet
format: parquet
mode: replace
Extract Products (pipelines/extract/products.yml)¶
name: extract_products
description: Extract product catalog
engine: duckdb
source:
type: database
connection: postgres
table: products
sink:
type: file
path: staging/products/products.parquet
format: parquet
mode: replace
Transform (pipelines/transform/daily_metrics.yml)¶
name: transform_daily_metrics
description: Transform and aggregate daily data
engine: duckdb
source:
type: file
path: staging/orders/date=${DATE}/*.parquet
format: parquet
transforms:
- op: join
right:
type: file
path: staging/products/products.parquet
format: parquet
on: [product_id]
how: left
- op: derive_column
name: line_total
expr: quantity * unit_price
- op: aggregate
group_by: [category, order_date]
aggregations:
total_revenue: sum(line_total)
order_count: count(distinct order_id)
checks:
- check: not_null
columns: [category, total_revenue]
- check: row_count
min: 1
sink:
type: file
path: staging/metrics/date=${DATE}/metrics.parquet
format: parquet
Load Warehouse (pipelines/load/warehouse.yml)¶
name: load_warehouse
description: Load metrics to data warehouse
engine: snowflake
source:
type: file
path: staging/metrics/date=${DATE}/*.parquet
format: parquet
transforms:
- op: derive_column
name: loaded_at
expr: current_timestamp()
checks:
- check: not_null
columns: [category, total_revenue, loaded_at]
- check: expression
expr: total_revenue >= 0
sink:
type: database
connection: snowflake
table: analytics.daily_metrics
mode: merge
merge_keys: [category, order_date]
Advanced Patterns¶
Dynamic Task Generation¶
@dag(...)
def dynamic_pipeline():
@task
def get_regions():
# Could come from database or API
return ["us-east", "us-west", "eu-west", "ap-south"]
@quicketl_task(config="pipelines/regional.yml")
def process_region(region, **context):
return {
"DATE": context["ds"],
"REGION": region,
}
regions = get_regions()
process_region.expand(region=regions)
Conditional Execution¶
from airflow.operators.python import BranchPythonOperator
@dag(...)
def conditional_pipeline():
def choose_pipeline(**context):
# Monday = full refresh, other days = incremental
if context["execution_date"].weekday() == 0:
return "full_refresh"
return "incremental"
branch = BranchPythonOperator(
task_id="choose_pipeline",
python_callable=choose_pipeline,
)
@quicketl_task(config="pipelines/full_refresh.yml")
def full_refresh(**context):
return {"DATE": context["ds"]}
@quicketl_task(config="pipelines/incremental.yml")
def incremental(**context):
return {"DATE": context["ds"]}
branch >> [full_refresh(), incremental()]
SLA Monitoring¶
@dag(
sla_miss_callback=slack_sla_alert,
default_args={
"sla": timedelta(hours=2),
},
)
def sla_monitored_pipeline():
@quicketl_task(
config="pipelines/critical.yml",
sla=timedelta(hours=1), # Stricter SLA for this task
)
def critical_task(**context):
return {"DATE": context["ds"]}
Deployment¶
Project Structure¶
airflow/
├── dags/
│ ├── daily_etl_dag.py
│ └── weekly_etl_dag.py
├── pipelines/
│ ├── extract/
│ │ ├── orders.yml
│ │ ├── products.yml
│ │ └── customers.yml
│ ├── transform/
│ │ └── daily_metrics.yml
│ └── load/
│ └── warehouse.yml
└── requirements.txt
Requirements¶
Airflow Variables¶
Set in Airflow UI or via CLI:
airflow variables set quicketl_warehouse "ETL_WH"
airflow variables set quicketl_s3_bucket "data-lake-prod"
Airflow Connections¶
# PostgreSQL source
airflow connections add postgres_source \
--conn-type postgres \
--conn-host localhost \
--conn-login user \
--conn-password pass \
--conn-schema mydb
# Snowflake destination
airflow connections add snowflake_default \
--conn-type snowflake \
--conn-host xy12345.us-east-1 \
--conn-login etl_user \
--conn-password pass \
--conn-schema analytics
Next Steps¶
- Airflow Integration Guide - Complete reference
- Cloud ETL Example - Cloud-native pipelines
- Production Best Practices