Python API Reference¶
QuickETL provides a Python API for programmatic pipeline creation and execution. Use the API when you need:
- Dynamic pipeline generation
- Integration with Python applications
- Programmatic control over execution
- Custom preprocessing or postprocessing
Quick Start¶
from quicketl import Pipeline
# Load and run a pipeline
pipeline = Pipeline.from_yaml("pipeline.yml")
result = pipeline.run()
print(f"Processed {result.rows_processed} rows")
Core Classes¶
Pipeline¶
The main entry point for working with QuickETL pipelines.
from quicketl import Pipeline
# From YAML file
pipeline = Pipeline.from_yaml("pipeline.yml")
# From configuration dict
pipeline = Pipeline.from_config({
"name": "my_pipeline",
"engine": "duckdb",
"source": {"type": "file", "path": "data.csv", "format": "csv"},
"sink": {"type": "file", "path": "output.parquet", "format": "parquet"}
})
QuickETLEngine¶
Direct access to the execution engine for advanced use cases.
from quicketl import QuickETLEngine
engine = QuickETLEngine(backend="duckdb")
result = engine.execute(config)
Learn more about QuickETLEngine →
Configuration Models¶
Pydantic models for type-safe pipeline configuration.
from quicketl.config import PipelineConfig, FileSource, FileSink
config = PipelineConfig(
name="typed_pipeline",
engine="duckdb",
source=FileSource(type="file", path="input.csv", format="csv"),
sink=FileSink(type="file", path="output.parquet", format="parquet")
)
Learn more about Configuration →
Quality Checks¶
Programmatic data quality validation.
from quicketl.quality import NotNullCheck, UniqueCheck
checks = [
NotNullCheck(columns=["id", "name"]),
UniqueCheck(columns=["id"])
]
Learn more about Quality Checks →
Common Patterns¶
Run Pipeline with Variables¶
from quicketl import Pipeline
pipeline = Pipeline.from_yaml("pipeline.yml")
result = pipeline.run(variables={
"DATE": "2025-01-15",
"REGION": "north"
})
Validate Before Running¶
from quicketl import Pipeline
pipeline = Pipeline.from_yaml("pipeline.yml")
# Validate configuration
errors = pipeline.validate()
if errors:
for error in errors:
print(f"Error: {error}")
else:
result = pipeline.run()
Dry Run¶
from quicketl import Pipeline
pipeline = Pipeline.from_yaml("pipeline.yml")
result = pipeline.run(dry_run=True)
print(f"Would process {result.rows_processed} rows")
Access Results¶
from quicketl import Pipeline
pipeline = Pipeline.from_yaml("pipeline.yml")
result = pipeline.run()
print(f"Pipeline: {result.pipeline_name}")
print(f"Status: {result.status}")
print(f"Duration: {result.duration_ms}ms")
print(f"Rows processed: {result.rows_processed}")
print(f"Rows written: {result.rows_written}")
print(f"Checks passed: {result.checks_passed}")
print(f"Checks failed: {result.checks_failed}")
Get Result DataFrame¶
from quicketl import Pipeline
pipeline = Pipeline.from_yaml("pipeline.yml")
result = pipeline.run()
# Access the result as a DataFrame
df = result.to_dataframe()
print(df.head())
Error Handling¶
from quicketl import Pipeline
from quicketl.exceptions import (
QuickETLError,
ConfigurationError,
ExecutionError,
QualityCheckError
)
try:
pipeline = Pipeline.from_yaml("pipeline.yml")
result = pipeline.run()
except ConfigurationError as e:
print(f"Configuration error: {e}")
except QualityCheckError as e:
print(f"Quality check failed: {e}")
print(f"Failed checks: {e.failed_checks}")
except ExecutionError as e:
print(f"Execution error: {e}")
except QuickETLError as e:
print(f"QuickETL error: {e}")
Integration Examples¶
With Airflow¶
from airflow.decorators import task
@task
def run_quicketl_pipeline(config_path: str, **kwargs):
from quicketl import Pipeline
pipeline = Pipeline.from_yaml(config_path)
result = pipeline.run(variables=kwargs)
return {
"status": result.status,
"rows": result.rows_written
}
With FastAPI¶
from fastapi import FastAPI, BackgroundTasks
from quicketl import Pipeline
app = FastAPI()
@app.post("/pipelines/{name}/run")
async def run_pipeline(name: str, background_tasks: BackgroundTasks):
def execute():
pipeline = Pipeline.from_yaml(f"pipelines/{name}.yml")
return pipeline.run()
background_tasks.add_task(execute)
return {"status": "started"}
With Prefect¶
from prefect import flow, task
from quicketl import Pipeline
@task
def run_etl(config_path: str):
pipeline = Pipeline.from_yaml(config_path)
return pipeline.run()
@flow
def etl_flow():
result = run_etl("pipeline.yml")
print(f"Processed {result.rows_written} rows")
API Reference¶
| Module | Description |
|---|---|
| Pipeline | Main pipeline class |
| QuickETLEngine | Execution engine |
| Config | Configuration models |
| Quality | Quality check classes |
Related¶
- CLI Reference - Command-line interface
- Pipeline YAML - YAML configuration
- Airflow Integration - Airflow integration