QuickETLEngine Class¶
The QuickETLEngine class provides low-level access to the QuickETL execution engine. Use this for advanced use cases where you need direct control over execution.
Use Pipeline Instead
For most use cases, the Pipeline class is recommended. Use QuickETLEngine only when you need low-level control.
Import¶
Constructor¶
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
backend |
str |
"duckdb" |
Backend engine name |
**options |
Backend-specific options |
Supported Backends:
duckdb- DuckDB (default)polars- Polarspandas- Pandasspark- Apache Sparkdatafusion- Apache DataFusionsnowflake- Snowflakebigquery- Google BigQuerypostgres- PostgreSQLmysql- MySQLclickhouse- ClickHouse
Example:
# Default (DuckDB)
engine = QuickETLEngine()
# Specific backend
engine = QuickETLEngine(backend="polars")
# With backend options
engine = QuickETLEngine(
backend="spark",
master="local[*]",
executor_memory="4g"
)
Methods¶
execute¶
Execute a pipeline configuration.
QuickETLEngine.execute(
config: PipelineConfig | dict,
variables: dict[str, str] | None = None,
dry_run: bool = False
) -> ExecutionResult
Parameters:
| Parameter | Type | Description |
|---|---|---|
config |
PipelineConfig \| dict |
Pipeline configuration |
variables |
dict[str, str] \| None |
Variable substitutions |
dry_run |
bool |
Execute without writing |
Returns: ExecutionResult
Example:
from quicketl import QuickETLEngine
from quicketl.config import PipelineConfig
engine = QuickETLEngine(backend="duckdb")
config = {
"name": "test",
"source": {"type": "file", "path": "data.csv", "format": "csv"},
"sink": {"type": "file", "path": "out.parquet", "format": "parquet"}
}
result = engine.execute(config)
read_source¶
Read data from a source configuration.
Parameters:
| Parameter | Type | Description |
|---|---|---|
source_config |
SourceConfig \| dict |
Source configuration |
Returns: Ibis table expression
Example:
engine = QuickETLEngine()
table = engine.read_source({
"type": "file",
"path": "data/sales.csv",
"format": "csv"
})
# Now you can use Ibis operations
filtered = table.filter(table.amount > 100)
result = filtered.execute()
write_sink¶
Write data to a sink configuration.
QuickETLEngine.write_sink(
table: Table,
sink_config: SinkConfig | dict,
mode: str = "replace"
) -> None
Parameters:
| Parameter | Type | Description |
|---|---|---|
table |
Table |
Ibis table expression |
sink_config |
SinkConfig \| dict |
Sink configuration |
mode |
str |
Write mode: "replace", "append" |
Example:
engine = QuickETLEngine()
# Read and transform
table = engine.read_source({"type": "file", "path": "in.csv", "format": "csv"})
filtered = table.filter(table.status == "active")
# Write
engine.write_sink(
filtered,
{"type": "file", "path": "out.parquet", "format": "parquet"},
mode="replace"
)
apply_transform¶
Apply a single transform to a table.
Parameters:
| Parameter | Type | Description |
|---|---|---|
table |
Table |
Input Ibis table |
transform |
TransformConfig \| dict |
Transform configuration |
Returns: Transformed Ibis table
Example:
engine = QuickETLEngine()
table = engine.read_source(source_config)
# Apply transforms one by one
table = engine.apply_transform(table, {"op": "filter", "predicate": "amount > 0"})
table = engine.apply_transform(table, {"op": "select", "columns": ["id", "amount"]})
apply_transforms¶
Apply multiple transforms to a table.
Parameters:
| Parameter | Type | Description |
|---|---|---|
table |
Table |
Input Ibis table |
transforms |
list |
List of transform configurations |
Returns: Transformed Ibis table
Example:
engine = QuickETLEngine()
table = engine.read_source(source_config)
transforms = [
{"op": "filter", "predicate": "amount > 0"},
{"op": "select", "columns": ["id", "name", "amount"]},
{"op": "sort", "by": [{"column": "amount", "order": "desc"}]}
]
result = engine.apply_transforms(table, transforms)
run_checks¶
Execute quality checks on a table.
Parameters:
| Parameter | Type | Description |
|---|---|---|
table |
Table |
Table to validate |
checks |
list |
List of check configurations |
Returns: CheckResults with pass/fail details
Example:
engine = QuickETLEngine()
table = engine.read_source(source_config)
checks = [
{"check": "not_null", "columns": ["id", "name"]},
{"check": "unique", "columns": ["id"]},
{"check": "row_count", "min": 1}
]
results = engine.run_checks(table, checks)
print(f"Passed: {results.passed}, Failed: {results.failed}")
for check in results.details:
print(f" {check.name}: {check.status}")
get_connection¶
Get the underlying Ibis connection.
Returns: Ibis backend connection
Example:
engine = QuickETLEngine(backend="duckdb")
conn = engine.get_connection()
# Execute raw SQL
result = conn.raw_sql("SELECT * FROM read_csv('data.csv') LIMIT 10")
Properties¶
backend¶
The configured backend name.
is_connected¶
Whether the engine has an active connection.
ExecutionResult¶
Result returned by execute().
| Attribute | Type | Description |
|---|---|---|
success |
bool |
Whether execution succeeded |
duration_ms |
float |
Execution time |
rows_processed |
int |
Rows read |
rows_written |
int |
Rows written |
check_results |
CheckResults \| None |
Quality check results |
error |
Exception \| None |
Error if failed |
table |
Table \| None |
Result table (if dry_run) |
CheckResults¶
Result returned by run_checks().
| Attribute | Type | Description |
|---|---|---|
passed |
int |
Number passed |
failed |
int |
Number failed |
total |
int |
Total checks |
details |
list[CheckResult] |
Individual results |
Complete Example¶
from quicketl import QuickETLEngine
# Initialize engine
engine = QuickETLEngine(backend="duckdb")
# Read source
table = engine.read_source({
"type": "file",
"path": "data/sales.csv",
"format": "csv"
})
# Apply transforms
transforms = [
{"op": "filter", "predicate": "status = 'completed'"},
{"op": "derive_column", "name": "total", "expr": "quantity * price"},
{"op": "aggregate", "group_by": ["category"], "aggregations": {"revenue": "sum(total)"}}
]
table = engine.apply_transforms(table, transforms)
# Run quality checks
checks = [
{"check": "not_null", "columns": ["category", "revenue"]},
{"check": "expression", "expr": "revenue >= 0"}
]
check_results = engine.run_checks(table, checks)
if check_results.failed > 0:
print("Quality checks failed!")
for detail in check_results.details:
if not detail.passed:
print(f" - {detail.name}: {detail.message}")
else:
# Write output
engine.write_sink(
table,
{"type": "file", "path": "output/revenue.parquet", "format": "parquet"}
)
print("Pipeline completed successfully")
Related¶
- Pipeline - High-level pipeline API
- Configuration Models - Configuration types
- Backend Selection - Choosing backends