Database Sinks¶
Write transformed data to relational databases.
Basic Usage¶
Configuration¶
| Field | Required | Default | Description |
|---|---|---|---|
type |
Yes | - | Must be database |
connection |
Yes | - | Database connection string |
table |
Yes | - | Target table name |
mode |
No | append |
Write mode: append, truncate |
Write Modes¶
Append (Default)¶
Add new rows to existing data:
Truncate¶
Delete existing data before writing:
Truncate is Destructive
Truncate deletes all existing data in the table before inserting new data.
Connection Strings¶
PostgreSQL¶
MySQL¶
Using Environment Variables¶
Examples¶
Daily Summary Table¶
name: daily_sales_summary
engine: duckdb
source:
type: file
path: s3://data-lake/raw/sales/${DATE}/*.parquet
transforms:
- op: aggregate
group_by: [region, category]
aggs:
total_sales: sum(amount)
order_count: count(*)
sink:
type: database
connection: ${POSTGRES_URL}
table: daily_summaries
mode: append
Incremental Load¶
name: incremental_load
engine: duckdb
source:
type: database
connection: ${SOURCE_DB}
query: |
SELECT * FROM orders
WHERE updated_at > '${LAST_RUN}'
transforms:
- op: select
columns: [id, customer_id, amount, status, updated_at]
sink:
type: database
connection: ${TARGET_DB}
table: orders_replica
mode: append
Full Refresh¶
name: full_refresh
engine: duckdb
source:
type: file
path: data/products.csv
format: csv
transforms:
- op: filter
predicate: active = true
sink:
type: database
connection: ${DATABASE_URL}
table: active_products
mode: truncate
Table Requirements¶
Table Must Exist¶
QuickETL does not create tables automatically. Create the table first:
CREATE TABLE sales_summary (
region VARCHAR(50),
category VARCHAR(50),
total_sales DECIMAL(12,2),
order_count INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
Column Matching¶
Output columns must match table columns:
transforms:
# Ensure output matches table schema
- op: select
columns: [region, category, total_sales, order_count]
Data Types¶
QuickETL attempts to convert types automatically. For best results:
- Use compatible types
- Cast explicitly if needed:
Python API¶
from quicketl.config.models import DatabaseSink
# Basic
sink = DatabaseSink(
connection="postgresql://localhost/db",
table="results"
)
# With truncate
sink = DatabaseSink(
connection="${DATABASE_URL}",
table="sales_summary",
mode="truncate"
)
Performance Tips¶
Batch Size¶
For large datasets, writes are batched automatically. Performance varies by database.
Indexes¶
Disable indexes before large inserts, re-enable after:
-- Before pipeline
ALTER INDEX idx_sales_date DISABLE;
-- After pipeline
ALTER INDEX idx_sales_date REBUILD;
Truncate vs Delete¶
truncate is faster than deleting all rows:
Connection Pooling¶
For high-frequency pipelines, consider connection pooling at the database level.
Troubleshooting¶
Table Not Found¶
Create the table before running the pipeline.
Column Mismatch¶
Ensure your output columns match the table schema:
Type Mismatch¶
Cast columns to correct types:
Permission Denied¶
Verify the database user has INSERT (and TRUNCATE if using truncate mode) permissions.
Limitations¶
No Upsert (Yet)¶
Upsert/merge operations are planned for a future release. Currently, use:
appendfor incremental loadstruncatefor full refreshes
No Schema Management¶
QuickETL does not create or modify table schemas. Manage schemas separately.
Related¶
- Database Sources - Reading from databases
- File Sinks - Alternative: write to files
- Backends - Database backend details