Production Best Practices¶
Guidelines for running QuickETL pipelines reliably in production environments.
Environment Configuration¶
Use Environment Variables¶
Never hardcode credentials or environment-specific values:
# Good: Environment variables
source:
type: database
connection: ${DATABASE_CONNECTION}
sink:
type: file
path: ${OUTPUT_BUCKET}/data/${DATE}/
# Set in environment
export DATABASE_CONNECTION=postgres_prod
export OUTPUT_BUCKET=s3://prod-data-lake
export DATE=$(date +%Y-%m-%d)
quicketl run pipeline.yml
Use .env Files¶
For local development and deployment:
# .env.production
DATABASE_URL=postgresql://user:pass@prod-db:5432/analytics
S3_BUCKET=prod-data-lake
SNOWFLAKE_ACCOUNT=xy12345.us-east-1
SNOWFLAKE_USER=etl_service
SNOWFLAKE_PASSWORD=${SNOWFLAKE_PASSWORD} # From secrets manager
Secrets Management¶
Never commit secrets to git.
Use secret managers:
# AWS Secrets Manager
export DB_PASSWORD=$(aws secretsmanager get-secret-value \
--secret-id prod/quicketl/db-password \
--query SecretString --output text)
# HashiCorp Vault
export DB_PASSWORD=$(vault kv get -field=password secret/quicketl/database)
# Google Secret Manager
export DB_PASSWORD=$(gcloud secrets versions access latest --secret=db-password)
Monitoring and Observability¶
Structured Logging¶
Use JSON output for parsing:
Metrics Collection¶
#!/bin/bash
# run_pipeline.sh
START_TIME=$(date +%s)
RESULT=$(quicketl run pipeline.yml --json)
END_TIME=$(date +%s)
# Extract metrics
STATUS=$(echo $RESULT | jq -r '.status')
DURATION=$(echo $RESULT | jq -r '.duration_ms')
ROWS=$(echo $RESULT | jq -r '.rows_written')
CHECKS_PASSED=$(echo $RESULT | jq -r '.checks_passed')
CHECKS_FAILED=$(echo $RESULT | jq -r '.checks_failed')
# Send to monitoring system
send_metrics "quicketl.pipeline.duration" $DURATION
send_metrics "quicketl.pipeline.rows" $ROWS
send_metrics "quicketl.pipeline.status" $([ "$STATUS" = "SUCCESS" ] && echo 1 || echo 0)
DataDog Integration¶
from datadog import statsd
from quicketl import Pipeline
pipeline = Pipeline.from_yaml("pipeline.yml")
with statsd.timed("quicketl.pipeline.duration", tags=["pipeline:daily_sales"]):
result = pipeline.run()
statsd.gauge("quicketl.pipeline.rows_written", result.rows_written,
tags=["pipeline:daily_sales"])
statsd.gauge("quicketl.pipeline.checks_passed", result.checks_passed,
tags=["pipeline:daily_sales"])
Health Checks¶
# health_check.py
from quicketl import QuickETLEngine
def check_backends():
"""Verify backend availability."""
backends = ["duckdb", "postgres"]
results = {}
for backend in backends:
try:
engine = QuickETLEngine(backend=backend)
results[backend] = "healthy"
except Exception as e:
results[backend] = f"unhealthy: {e}"
return results
if __name__ == "__main__":
import json
print(json.dumps(check_backends()))
Scheduling¶
Cron¶
# /etc/cron.d/quicketl
# Daily at 6 AM UTC
0 6 * * * quicketl /opt/quicketl/run_pipeline.sh daily_sales >> /var/log/quicketl/cron.log 2>&1
# Hourly
0 * * * * quicketl /opt/quicketl/run_pipeline.sh hourly_metrics >> /var/log/quicketl/cron.log 2>&1
Systemd Timer¶
# /etc/systemd/system/quicketl-daily.service
[Unit]
Description=QuickETL Daily Pipeline
After=network.target
[Service]
Type=oneshot
User=quicketl
ExecStart=/opt/quicketl/run_pipeline.sh daily_sales
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target
# /etc/systemd/system/quicketl-daily.timer
[Unit]
Description=Run QuickETL Daily Pipeline
[Timer]
OnCalendar=*-*-* 06:00:00
Persistent=true
[Install]
WantedBy=timers.target
Orchestrators¶
For complex workflows, use orchestrators:
- Airflow: Airflow Integration
- Prefect: Task-based orchestration
- Dagster: Software-defined assets
Error Handling¶
Retry Logic¶
#!/bin/bash
# run_with_retry.sh
MAX_RETRIES=3
RETRY_DELAY=300 # 5 minutes
for i in $(seq 1 $MAX_RETRIES); do
if quicketl run pipeline.yml --var DATE=$1; then
echo "$(date): Pipeline succeeded on attempt $i"
exit 0
fi
if [ $i -lt $MAX_RETRIES ]; then
echo "$(date): Attempt $i failed, retrying in ${RETRY_DELAY}s..."
sleep $RETRY_DELAY
fi
done
echo "$(date): Pipeline failed after $MAX_RETRIES attempts"
exit 1
Alerting¶
#!/bin/bash
# run_pipeline.sh
PIPELINE_NAME=$1
DATE=${2:-$(date +%Y-%m-%d)}
if ! quicketl run "pipelines/${PIPELINE_NAME}.yml" --var DATE=$DATE; then
# Send alert
curl -X POST "$SLACK_WEBHOOK" \
-H "Content-Type: application/json" \
-d "{
\"text\": \"🚨 Pipeline Failed: ${PIPELINE_NAME}\",
\"attachments\": [{
\"color\": \"danger\",
\"fields\": [
{\"title\": \"Pipeline\", \"value\": \"${PIPELINE_NAME}\", \"short\": true},
{\"title\": \"Date\", \"value\": \"${DATE}\", \"short\": true}
]
}]
}"
exit 1
fi
PagerDuty for Critical Pipelines¶
if ! quicketl run critical_pipeline.yml; then
curl -X POST https://events.pagerduty.com/v2/enqueue \
-H "Content-Type: application/json" \
-d "{
\"routing_key\": \"$PAGERDUTY_KEY\",
\"event_action\": \"trigger\",
\"payload\": {
\"summary\": \"Critical ETL Pipeline Failed\",
\"severity\": \"critical\",
\"source\": \"quicketl-prod\"
}
}"
fi
Deployment¶
Docker¶
# Dockerfile
FROM python:3.12-slim
WORKDIR /app
# Install QuickETL with required backends
RUN pip install quicketl[duckdb,postgres,snowflake]
# Copy pipelines
COPY pipelines/ /app/pipelines/
# Run as non-root user
RUN useradd -m quicketl
USER quicketl
ENTRYPOINT ["quicketl"]
CMD ["--help"]
# Build and run
docker build -t quicketl-pipelines .
docker run --env-file .env quicketl-pipelines run pipelines/daily.yml
Kubernetes¶
# k8s/quicketl-job.yaml
apiVersion: batch/v1
kind: Job
metadata:
name: quicketl-daily-sales
spec:
template:
spec:
containers:
- name: quicketl
image: quicketl-pipelines:latest
command: ["quicketl", "run", "pipelines/daily_sales.yml"]
env:
- name: DATE
value: "2025-01-15"
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: quicketl-secrets
key: database-url
resources:
requests:
memory: "2Gi"
cpu: "1"
limits:
memory: "4Gi"
cpu: "2"
restartPolicy: OnFailure
backoffLimit: 3
Kubernetes CronJob¶
# k8s/quicketl-cronjob.yaml
apiVersion: batch/v1
kind: CronJob
metadata:
name: quicketl-daily-sales
spec:
schedule: "0 6 * * *" # 6 AM daily
jobTemplate:
spec:
template:
spec:
containers:
- name: quicketl
image: quicketl-pipelines:latest
command: ["quicketl", "run", "pipelines/daily_sales.yml"]
envFrom:
- secretRef:
name: quicketl-secrets
restartPolicy: OnFailure
successfulJobsHistoryLimit: 3
failedJobsHistoryLimit: 3
Resource Management¶
Memory Limits¶
For large datasets:
# Limit Python memory
export PYTHONMALLOC=malloc
# Use memory-efficient backend
quicketl run pipeline.yml --engine polars
Disk Space¶
Clean up temporary files:
#!/bin/bash
# cleanup.sh
# Remove staging files older than 7 days
find /data/staging -type f -mtime +7 -delete
# Remove logs older than 30 days
find /var/log/quicketl -type f -mtime +30 -delete
Database Connections¶
Use connection pooling:
Idempotency¶
Design pipelines to be safely re-runnable:
# Use replace mode for full refresh
sink:
type: database
connection: postgres
table: analytics.daily_metrics
mode: replace
# Or use merge for incremental
sink:
type: database
connection: postgres
table: analytics.daily_metrics
mode: merge
merge_keys: [date, region]
Backup and Recovery¶
Backup Before Major Changes¶
#!/bin/bash
# backup_and_run.sh
TABLE="analytics.daily_metrics"
BACKUP_TABLE="${TABLE}_backup_$(date +%Y%m%d)"
# Create backup
psql -c "CREATE TABLE $BACKUP_TABLE AS SELECT * FROM $TABLE;"
# Run pipeline
if ! quicketl run pipeline.yml; then
echo "Pipeline failed, restoring from backup..."
psql -c "TRUNCATE $TABLE; INSERT INTO $TABLE SELECT * FROM $BACKUP_TABLE;"
exit 1
fi
# Cleanup old backups (keep last 7 days)
psql -c "DROP TABLE IF EXISTS ${TABLE}_backup_$(date -d '7 days ago' +%Y%m%d);"
Production Checklist¶
Before deploying to production:
- All pipelines validated (
quicketl validate) - Environment variables documented
- Secrets stored in secret manager
- Monitoring and alerting configured
- Retry logic implemented
- Backup strategy defined
- Resource limits set
- Runbook documented
- On-call rotation established
Related¶
- Error Handling - Handle failures gracefully
- Testing - Test before deploying
- Airflow Integration - Orchestration