Starlake: Open Source Data Integration & ETL Platform
Starlake is an enterprise-grade data pipeline platform that transforms how organizations handle data integration. Using a declarative approach with YAML and SQL, it eliminates complex coding while ensuring robust data governance and quality.
Revolutionize Your Data Workflows
Starlake is to Data Engineering what Terrofrm is to infrastructure. Using an YAMl declarative synatx, describe your load and transform tasks and let Starlake automatically generate your orchestration code.
1. Declarative Extract
Choose from two powerful extraction modes:
- Zero-code data extraction with YAML configurations
- Support any ODBC/JDBC compliant database
- Support for incremental and full loads
- Automated schema evolution handling
How it works: Let's say we want to extract data from a Postgres Server database on a daily basis
extract:
connectionRef: "starbake" # or mssql-adventure-works-db i extracting from SQL Server
jdbcSchemas:
- schema: "sales"
tables:
- name: "order_lines" # table name or simple "*" to extract all tables
#partitionColumn: "salesorderdetailid" # (optional) you may parallelize the extraction based on this field
#fetchSize: 100 # (optional) the number of rows to fetch at a time
#timestamp: salesdatetime # (optional) the timestamp field to use for incremental extraction
tableTypes:
- "TABLE"
#- "VIEW"
#- "SYSTEM TABLE"
#- "GLOBAL TEMPORARY"
#- "LOCAL TEMPORARY"
#- "ALIAS"
#- "SYNONYM"
That's it, we have defined our extraction pipeline.
Visit our extraction user guide to learn more
2. Declarative Load
Transform data ingestion into a declarative process:
- Zero-code data loading with YAML configurations
- Support CSV, TSV, JSON, XML, Fixed width, Parquet, Avro file formats
- Automated data quality validation
- Built-in privacy and security controls
- Support for all major data warehouses
- Apply Column and row level security
Let's say we want to load the data extracted from the previous example into a datawarehouse
---
# this yaml file has been automatically be generated by infer-schema command
table:
pattern: "order_lines.*.psv" # This property is a regular expression that will be used to match the file name.
schedule: "when_available" # (optional) cron expression to schedule the loading
metadata:
mode: "FILE"
format: "CSV" # (optional) auto-detected if not specified
encoding: "UTF-8"
withHeader: yes # (optional) auto-detected if not specified
separator: "|" # (optional) auto-detected if not specified
writeStrategy:
type: "UPSERT_BY_KEY_AND_TIMESTAMP"
timestamp: signup
key: [id]
# Please replace it by the adequate file pattern eq. customers-.*.psv if required
attributes: # Description of the fields to recognize
- name: "productid" # attribute name and column name in the destination table if no rename attribute is defined
type: "string" # expected type
required: false # Is this field required in the source (false by default, change it accordingly) ?
privacy: "NONE" # Should we encrypt this field before loading to the warehouse (No encryption by default )?
ignore: false # Should this field be excluded (false by default) ?
- name: "sale_date" # second attribute
type: "timestamp" # auto-detected if not specified
- name: "unitprice"
type: "float"
...
That's it, we have defined our loading pipeline.
Visit our load tutorial to learn more.
3. Declarative Transform
Simplify transformations with SQL and YAML:
- Write jinja free SQL SELECT statements, no more jinja
refs
in your code. - Automatic column and table level lineage
- Built-in support for incremental processing
- Apply Column and row level security
Let's say we want to build aggregates from the previously loaded data
transform:
default:
writeStrategy:
type: "OVERWRITE"
tasks:
- name: most_profitable_products
writeStrategy:
type: "UPSERT_BY_KEY_AND_TIMESTAMP"
timestamp: signup
key: [id]
SELECT # the SQL query will be translated into the appropriate MERGE INTO or INSERT OVERWRITE statement
productid,
SUM(unitprice * orderqty) AS total_revenue
FROM order_lines
GROUP BY productid
ORDER BY total_revenue DESC
Starlake will automatically apply the right merge strategy (INSERT OVERWRITE or MERGE INTO) based on writeStrategy
property and the input /output tables .
Visit our transform tutorial to learn more
4. Declarative Tests
Run your load and transorm on a embedded locally DuckDB database to reduce costs thanks to Starlake SQL transpiler.
Without any modification toyour original query, Starlake on the fly transpile your queries to DuckDB SQL during the testing phase.
To write your test, simply pour in the folder named after your load or transform task, the expected result to an expected query.
In the example below, we run a test to validate the load task on the table sales.order_lines.
.
├── load
│ ├── sales
│ │ └── order_lines
│ │ └── test_order_lines_with_10_orders
│ │ ├── _expected_10_orders.sql # SQL request to run against the target table
│ │ ├── _expected_10_orders.json # expected data when expected_10_orders.sql is run on the target table
│ │ └── _incoming.sales_orders_lines.json # test data
Visit our test tutorial to learn more
5. Declarative Orchestration
Automate your entire data pipeline:
- Support Airflow, Dagster and Snowflake orchestrators
- Visual pipeline monitoring
- Efficient parallel execution
- Automated error handling and recovery
Define your load DAG template
dag:
comment: "dag for loading all {{domain}} tables"
template: "load/airflow__scheduled_table__shell.py.j2" # Select one of the pre-existing templates
filename: "airflow_{{domain}}_tables.py"
Define your transform DAG template
dag:
comment: "dag for transforming domain {{domain}} with schedule {{schedule}}"
template: "transform/airflow__scheduled_task__shell.py.j2"
filename: "airflow_{{domain}}_{{schedule}}_tasks.py"
Visit our orchestration tutorial to learn more