Error Handling Best Practices¶
Strategies for handling errors gracefully in QuickETL pipelines.
Types of Errors¶
Configuration Errors¶
Invalid YAML or missing required fields:
$ quicketl validate pipeline.yml
Configuration is invalid
Errors:
- sink: Field required
- transforms -> 0 -> op: Input should be 'select', 'filter', ...
Prevention: Always validate before running:
Runtime Errors¶
Errors during execution:
- File not found
- Database connection failed
- Out of memory
- Permission denied
Data Quality Errors¶
Quality checks that fail:
Quality Checks: FAILED (2/3 passed)
✓ not_null: id, name
✗ unique: email (152 duplicates found)
✓ row_count: min=1
Quality Check Strategies¶
Critical vs Non-Critical Checks¶
checks:
# Critical: Must pass 100%
- check: not_null
columns: [id, customer_id]
- check: unique
columns: [id]
# Non-critical: Warning only (use threshold)
- check: expression
expr: email LIKE '%@%.%'
threshold: 0.95 # 95% must pass
- check: expression
expr: amount > 0
threshold: 0.99 # 99% must pass
Continue on Check Failure¶
For non-critical pipelines:
Programmatic Handling¶
from quicketl import Pipeline
from quicketl.exceptions import QualityCheckError
pipeline = Pipeline.from_yaml("pipeline.yml")
try:
result = pipeline.run(fail_on_checks=True)
except QualityCheckError as e:
print(f"Quality checks failed:")
for check in e.failed_checks:
print(f" - {check.name}: {check.message}")
# Decide what to do
if "critical" in e.failed_checks[0].name:
raise # Re-raise for critical failures
else:
# Log and continue for warnings
logger.warning(f"Non-critical check failed: {e}")
Retry Strategies¶
Simple Retry Script¶
#!/bin/bash
MAX_RETRIES=3
RETRY_DELAY=60
for i in $(seq 1 $MAX_RETRIES); do
if quicketl run pipeline.yml --var DATE=$1; then
echo "Success on attempt $i"
exit 0
fi
if [ $i -lt $MAX_RETRIES ]; then
echo "Attempt $i failed, retrying in ${RETRY_DELAY}s..."
sleep $RETRY_DELAY
fi
done
echo "Failed after $MAX_RETRIES attempts"
exit 1
Exponential Backoff¶
#!/bin/bash
MAX_RETRIES=5
BASE_DELAY=30
for i in $(seq 1 $MAX_RETRIES); do
if quicketl run pipeline.yml; then
exit 0
fi
if [ $i -lt $MAX_RETRIES ]; then
DELAY=$((BASE_DELAY * 2 ** (i - 1)))
echo "Retry $i in ${DELAY}s..."
sleep $DELAY
fi
done
exit 1
Python Retry¶
import time
from quicketl import Pipeline
from quicketl.exceptions import ExecutionError
def run_with_retry(config_path, max_retries=3, base_delay=30):
pipeline = Pipeline.from_yaml(config_path)
for attempt in range(1, max_retries + 1):
try:
return pipeline.run()
except ExecutionError as e:
if attempt == max_retries:
raise
delay = base_delay * (2 ** (attempt - 1))
print(f"Attempt {attempt} failed: {e}")
print(f"Retrying in {delay}s...")
time.sleep(delay)
Logging and Monitoring¶
Verbose Output¶
Shows detailed step-by-step execution.
JSON Output for Monitoring¶
{
"pipeline_name": "daily_sales",
"status": "SUCCESS",
"duration_ms": 1234.5,
"rows_processed": 10000,
"rows_written": 9500,
"checks_passed": 3,
"checks_failed": 0
}
Send to Monitoring System¶
#!/bin/bash
RESULT=$(quicketl run pipeline.yml --json)
STATUS=$(echo $RESULT | jq -r '.status')
DURATION=$(echo $RESULT | jq -r '.duration_ms')
ROWS=$(echo $RESULT | jq -r '.rows_written')
# Send to DataDog
curl -X POST "https://api.datadoghq.com/api/v1/series" \
-H "Content-Type: application/json" \
-H "DD-API-KEY: $DD_API_KEY" \
-d "{
\"series\": [{
\"metric\": \"quicketl.pipeline.duration\",
\"points\": [[$(date +%s), $DURATION]],
\"tags\": [\"pipeline:daily_sales\", \"status:$STATUS\"]
}]
}"
Error Recovery Patterns¶
Idempotent Pipelines¶
Design pipelines that can be safely re-run:
# Replace mode: Safe to re-run
sink:
type: database
connection: postgres
table: analytics.daily_metrics
mode: replace
# Or use merge with keys
sink:
type: database
connection: postgres
table: analytics.daily_metrics
mode: merge
merge_keys: [date, region] # Unique identifier
Checkpoint Pattern¶
For long-running pipelines, break into checkpoints:
# Step 1: Extract (can re-run)
name: extract_raw
sink:
type: file
path: staging/raw_${DATE}.parquet
# Step 2: Transform (starts from checkpoint)
name: transform_data
source:
type: file
path: staging/raw_${DATE}.parquet # Checkpoint
sink:
type: file
path: staging/transformed_${DATE}.parquet
# Step 3: Load (final step)
name: load_warehouse
source:
type: file
path: staging/transformed_${DATE}.parquet
sink:
type: database
connection: snowflake
table: analytics.metrics
Dead Letter Queue¶
Capture failed records:
from quicketl import Pipeline
pipeline = Pipeline.from_yaml("pipeline.yml")
try:
result = pipeline.run(fail_on_checks=False)
if result.checks_failed > 0:
# Get failed records
failed_df = result.get_failed_records()
# Write to dead letter queue
failed_df.to_parquet(f"dlq/failed_{date}.parquet")
except Exception as e:
# Log entire batch to DLQ
logger.error(f"Pipeline failed: {e}")
shutil.copy(input_file, f"dlq/failed_batch_{date}.parquet")
Alerting¶
Email on Failure¶
#!/bin/bash
if ! quicketl run pipeline.yml; then
echo "Pipeline failed at $(date)" | \
mail -s "ALERT: ETL Pipeline Failed" team@company.com
exit 1
fi
Slack Notification¶
import requests
from quicketl import Pipeline
SLACK_WEBHOOK = "https://hooks.slack.com/services/..."
def notify_slack(message, color="danger"):
requests.post(SLACK_WEBHOOK, json={
"attachments": [{
"color": color,
"text": message
}]
})
try:
pipeline = Pipeline.from_yaml("pipeline.yml")
result = pipeline.run()
notify_slack(
f"✓ Pipeline completed: {result.rows_written} rows",
color="good"
)
except Exception as e:
notify_slack(f"✗ Pipeline failed: {e}")
raise
PagerDuty Integration¶
#!/bin/bash
if ! quicketl run pipeline.yml; then
curl -X POST https://events.pagerduty.com/v2/enqueue \
-H "Content-Type: application/json" \
-d '{
"routing_key": "'$PD_ROUTING_KEY'",
"event_action": "trigger",
"payload": {
"summary": "ETL Pipeline Failed",
"severity": "critical",
"source": "quicketl"
}
}'
exit 1
fi
Debugging Tips¶
Dry Run¶
Test without writing output:
Verbose Logging¶
Validate Configuration¶
Check Backend Availability¶
Test with Sample Data¶
Create a small test dataset:
head -100 data/large_file.csv > data/test_sample.csv
quicketl run pipeline.yml --var INPUT=data/test_sample.csv
Related¶
- Quality Checks - Check configuration
- Production - Production deployment
- Troubleshooting - Common issues