Skip to main content

4 posts tagged with "Starlake"

View All Tags

Starlake OSS - Bringing Declarative Programming to Data Engineering and Analytics

· 6 min read
Hayssam Saleh
Starlake Core Team

Introduction

The advent of declarative programming through tools like Ansible and Terraform, has revolutionized infrastructure deployment by allowing developers to achieve intended goals without specifying the order of code execution.

This paradigm shift brings forth benefits such as reduced error rates, significantly shortened development cycles, enhanced code readability, and increased accessibility for developers of all levels.

This is the story of how a small team of developers crafted a platform that goes beyond the boundaries of conventional data engineering by applying a declarative approach to data extraction, loading, transformation and orchestration.

Starlake

The Genesis

Back in 2015, at the helm of ebiznext, a boutique data engineering company, we faced a daunting challenge. Our client, a prominent entity in need of a robust big data solution, sought to harness the power of Hadoop and Spark. Despite our modest size (20 people), we dared to compete against industry giants with tenfold resources (100.000+ headcount).

Our only chance to succeed was to innovate: we needed a data platform that could exponentially outperform the traditional ETL solutions pushed by our competitors. To build data pipelines these GUI based ETLs require an effort that is proportional to the number and complexity of the sources.

Determined to disrupt this norm, we embarked on a quest to devise a DevOps friendly platform capable of lightning-fast data ingestion from any source, without the drawbacks of ETLs or specialized engineering skills.

The day of the tender, our ability to deliver a solution that could load data in a few weeks instead of many months allowed us to stand out from the competition and win the project.

Expisode 1: Smartlake Emerges

note

The basic idea behind Smartlake was that no datawarehouse would stay clean if data quality is checked after the data has been loaded and this pre-load quality checks needed to be handled by data owners.

This left us with little choice but to embrace the declarative approach. Empowering business users, we devised a system where data formats and transformations could be described in simple JSON files. Smartlake wasn’t merely a code generator; it was a versatile engine, seamlessly ingesting diverse data formats, executing transformations, and orchestrating operations with unparalleled efficiency.

To streamline user interaction, we devised an intuitive Excel-to-JSON converter, enabling effortless specification of input formats. Thanks to Smartlake and its declarative approach, the business users were able to define load and transformation operations in a matter of minutes.

Smartlake Standout features

  • Load almost any file format at Spark speed (CSV, JSON, XML, FIXED WITH, Multi-record types …) or Kafka topic

  • Validate fields using user-defined schemas with user defined semantic types

  • Apply transformations on the fly to data being loaded (GDPR, normalisation, computed fields ...) with and without schema evolution

  • Sink to almost any target including Spark, Kafka, Elasticsearch.

Episode 2: Evolution to Starlake

note

The basic idea behind Starlake was to bring in all Smartlake benefits to the cloud by leveraging serverless services and Cloud Datawarehouses capabilities while minimising development and execution costs.

As the data landscape evolved, so did our vision. Cloud data warehouses emerged as formidable competitors to Spark for query execution. Recognizing this shift, we evolved Smartlake into Starlake, preserving its declarative essence while embracing YAML for enhanced readability. We maintained Spark’s prowess to run inside single or multiple container(s) for data ingestion, leveraging cloud data warehouses for query execution.

This strategic blend allowed us to optimize performance and cost-effectiveness based on specific workload requirements. The result was a reimagined platform, tailored for the cloud era, yet grounded in the principles of efficiency and simplicity that defined its inception.

The result is the Starlake OSS project that you can find on Github.

The capabilities of Starlake are extensively described here.

The people behind Starlake

Smartlake, the precursor to Starlake, owes its existence to the collective efforts of numerous individuals, but a select few stand out for their exceptional contributions:

  • Sam Bessalah With Sam’s presence, rallying others became effortless. His visionary outlook and knack for simplifying complexities proved transformative, setting a new standard for implementation.

  • Olivier Girardot Every team has its coding wizard, and Olivier filled that role impeccably. From leveraging Spark codegen to exploring mathematical frameworks like matryoshka, he pushed the boundaries, mentoring the team with his expertise spanning Docker, Ansible, Python, Scala and Spark internals.

  • Valentin Kasas Valentin championed functional programming in Scala. Introducing concepts like recursion schemes, he empowered the team to craft code that was not just functional but also elegant and maintainable.

As the journey progressed towards the cloud, long time data experts joined and made Starlake what it is today:

  • Bounkong Khamphousone The speed and efficiency of Starlake’s extraction and load engines owe much to his contributions.

  • Mohamad Kassir His direct involvement with customer projects and his in-depth knowledge of cloud platforms and business needs have been major assets in the evolution of Starlake.

  • Abdelhamide EL ARIB An early contributor to the load engine, this foresight and execution prowess played a significant role in shaping the platform’s today capabilities.

  • Stephane Manciot The developer behind Starlake’s declarative workflows on top of Airflow and Dagster, was pivotal in shaping its operational backbone.

  • Cyrille Chépélov A master of codebase optimisation, Cyrille’s rewrite efforts were instrumental in ensuring the reentrant nature of Starlake’s API.

The next journey

Today with hundreds of gigabytes of data loaded and transformed daily into thousands of tables in various data warehouses, we can confidently say that Starlake is battle tested and ready for the most demanding data engineering & analytics challenges.

As Starlake is, and will always be open source, join us in building a supportive community. Your insights and feature requests aren’t just welcome, they guide our roadmap.

Get Started with Starlake:

Join our community on GitHub

P.S. Please star the repository: https://github.com/starlake-ai/starlake. Also, any issue or enhancement with Starlake, please just report it. It will fall under the scope of gracious care taking of course.

Column and Row Level Security in BigQuery

· 3 min read
Hayssam Saleh
Starlake Core Team Member

Data exposition strategies

Data may be exposed using views or authorized views and more recently using Row / Column level security.

Historically, to restrict access on specific columns or rows in BigQuery, one can create a (authorized) view with a SQL request like the one below:

CLS / RLS using Views

BigQuery Views require to grant access for the end users to the table on top of which the view is created. To bypass that limitation, BigQuery provide Authorized views. However, Authorized views come with the following restrictions:

  1. The underlying table is accessed through the authorized view where the end user is impersonated, loosing thus at the table level, the identity of the user making the request. Impersonation

  2. Each restriction policy require to define a specific authorized view making it difficult to identify who has access to what ? Multiplication of Authorized Views

  3. Authorized views need to be updated whenever a schema evolution on the underlying table bring in a sensitive field that need to be excluded or a field that need to be included in the view. In the example below, the new column "description" need to be added to the authorized view if we want it . Multiplication of Authorized Views

That's where Row Level Security and Column Level security features natively supported by BigQuery come in.

BigQuery Row Level Security

Row Level Security restrict access to the rows based on the conditions set in the where clause using the custom SQL statement below:

RLS

Big Query Column Level Security

Column level security in BigQuery is managed using a taxonomy. This taxonomy is a hierarchy of policy tags describing the table attributes or other resources. By assigning access rights to a tag, we restrict access to any resource tagged using this specific tag and this applies to BigQuery table fields.

In our example, restricting access to specific user/group/sa to the column price require the following steps:

  1. In Cloud Data Catalog/Policy Tags, create a Taxonomy. Note that Enfore access control should be checked.

CLS Taxonomy

  1. Assign permissions for each policy tag you defined

CLS Access

  1. Tag restricted columns in the BigQuery schema editor. CLS Assign
tip

Assigning policy tags may be done using the bq load/update command line tool

BigQuery RLS/CLS benefits

Using BigQuery row and column level security features bring several benefits:

  • There is no need to create extra views
  • Users use the same name for the table but with different access rights
  • A company-wide taxonomy is defined allowing better Data Management
  • Access rights to a new column in the table are automatically handled

A word about RLS and CLS in Starlake

Ingesting Data into BigQuery cannot be considered complete without taking into account the access level restrictions on the target table. Starlake will handle for you all the scripting required to secure BigQuery rows and columns using a YAML declarative syntax to make sure that your tables are secured in BigQuery:

Declarative Row Level & Column Level Security
  - name: "PRODUCT"
rls:
- name: "my-rls"
predicate: "category like 'Food'"
grants:
- "user:me@company.com"
- "group:financegroup@company.com"
- "sa:serviceacount@gserviceaccount.com"
attributes:
- name: "id"
accessPolicy: PII

Handling Dynamic Partitioning and Merge with Spark on BigQuery

· 7 min read
Hayssam Saleh
Starlake Core Team Member

Data Loading strategies

When loading data into BigQuery, you may want to:

  • Overwrite the existing data and replace it with the incoming data.
  • Append incoming data to existing
  • Dynamic partition Overwrite where only the partitions to which the incoming data belong to are overwritten.
  • Merge incoming data with existing data by keeping the newest version of each record.

For performance reasons, when having huge amount of data, tables are usually split into multiple partitions. BigQuery supports range partitioning which are uncommon and date/time partitioning which is the most widely used type of partitioning. The diagram below shows our initial table partitioned by the date field.

Initial data

Let's assume we receive the following data that we need to ingest into the table:

Incoming data

The strategies above will produce respectively the results below:

The table ends up with the 2 incoming records. All existing partitions are deleted.

Overwrite data

There is no good or bad strategy, the use of one of the strategies above depends on the use case. Some use case examples for each of the strategies are:

  • Overwrite mode may be useful when you receive every day the list of all product names.
  • Append mode may be useful when you receive daily sales.
  • Dynamic Partition Overwrite mode may be useful when you ingested the first time a partition, and you need to ingest it again with a different set of data and thus alter only that partition.
  • Merge mode may be useful when you receive product updates every day and that you need to keep only the last version of each product.

Spark How-to

Apache Spark SQL connector for Google BigQuery makes BigQuery a first class citizen as a source and sink for Spark jobs.

Append and Overwrite modes in Spark

BigQuery is supported by Spark as a source and sink through the Spark BigQuery connector

Spark comes out of the box with the ability to append or overwrite existing data using a predefined save mode:


val incomingDF = ... // Incoming data loaded with the correct schema
val bqTable = "project-id.dataset.table"
val saveMode = SaveMode.Overwrite // or SaveMode.Append fot he appending data
incomingDF.write
.mode(saveMode)
.partitionBy("date")
.format("com.google.cloud.spark.bigquery")
.option("table", bqTable)
.save()

Dynamic Partition Overwrite mode in Spark

To activate dynamic partitioning, you need to set the configuration below before saving the data using the exact same code above:

spark.conf.set("spark.sql.sources.partitionOverwriteMode","DYNAMIC")

Unfortunately, the BigQuery Spark connector does not support this feature (at the time of writing). We need to manually delete the partitions we want to overwrite first and then append the incoming data.

Assuming the table is partitioned by the field date and the incoming data loaded in the incomingDF dataframe, the code below will remove existing partitions that need to be overwritten.

Delete partitions that need to be updated
val incomingDF = ... // Incoming data loaded with the correct schema
incomingDF
.select(date_format(col("date"), "yyyyMMdd").cast("string"))
.distinct()
.collect()
.map(_.getString(0))
.foreach { partition =>
bigQueryClient.deleteTable(TableId.of(datasetName, s"$table\$$partition"));
}
tip

To drop a table partition using the Google Cloud bq command line tool, you may use the following syntax:

bq rm -t 'project-id.dataset.table$YYYYMMDD'

We now need to append the incomingDF to mimic the dynamic partition overwrite feature:

Append incoming partitions
val incomingDF = ... // Incoming data loaded with the correct schema
val bqTable = "project-id.dataset.table"
val saveMode = SaveMode.Append
incomingDF.write
.mode(saveMode)
.partitionBy("date")
.format("com.google.cloud.spark.bigquery")
.option("table", bqTable)
.save()
caution

The issue with this approach is that if the program crashes during the "appending" of the incoming data, partitions will have been deleted and data would be lost. However, you can still ingest the same file again in case of failure and the end result will be the same.

Dynamic Partition Merge in Spark

When you need to keep the last version of the record for each product, both BigQuery and Databricks (the company behind Spark in case you lived on the moon the last ten years) support the merge SQL statement:

Merge records using SQL statement
MERGE INTO target_table
USING incoming_table
ON target_table.product = incoming_table.product
WHEN NOT MATCHED
THEN INSERT *
WHEN MATCHED AND incoming_table.date > target_table.date THEN
UPDATE SET *
/*
WHEN MATCHED AND incoming_table.timestamp <= target_table.timestamp THEN
SKIP
*/

Unfortunately the MERGE statement is not supported by Apache Spark. It is only supported by Databricks, its commercial version.

To do a merge using the Spark BigQuery connector, we need to do it by following the steps below:

Step 1: Create a dataframe with all the rows

val allRowsDF =
incomingDF
.unionByName(existingDF)

Step 1

Step 2: group by product and order each product occurrence by date descending

val orderingWindow =
Window
.partitionBy("product")
.orderBy(col("date").desc, col("product")))

val orderedDF =
allRowsDF
.withColumn("rownum", row_number.over(orderingWindow))

Step 2

In the step 2 above, each product is ordered by date with the most recent one first (descending order). We identify it by the rownum column.

Step 3: Keep the most recent product

val toKeepDF =
orderedDF
.where(col("rownum") === 1)
.drop("rownum")

Step 3

Step 4: Overwrite existing partitions with the data we want to keep


val bqTable = "project-id.dataset.table"
val saveMode = SaveMode.Overwrite
toKeepDF.write
.mode(saveMode)
.partitionBy("date")
.format("com.google.cloud.spark.bigquery")
.option("table", bqTable)
.save()

Step 4

Starlake How-to

Starlake is a declarative Ingestion Framework based on YAML description files.
The 4 ingestion strategies described above are supported through the settings below:

Schema Definition File
     name: "mydb"
directory: "..."
+ metadata:
schemas:
- name: "mytable"
pattern: "data-.*.csv"
metadata:
writeStrategy:
type: "OVERWRITE"
attributes:
- name: "date"
type: "date"
rename: "id"
- name: "product"
type: "string"
- name: "price"
type: "decimal"

See again manual Spark overwrite