Handling Dynamic Partitioning and Merge with Spark on BigQuery
Data Loading strategies
When loading data into BigQuery, you may want to:
- Overwrite the existing data and replace it with the incoming data.
- Append incoming data to existing
- Dynamic partition Overwrite where only the partitions to which the incoming data belong to are overwritten.
- Merge incoming data with existing data by keeping the newest version of each record.
For performance reasons, when having huge amount of data, tables are usually split
into multiple partitions. BigQuery supports range partitioning which are uncommon and date/time partitioning
which is the most widely used type of partitioning.
The diagram below shows our initial table partitioned by the date
field.
Let's assume we receive the following data that we need to ingest into the table:
The strategies above will produce respectively the results below:
- Overwrite
- Append
- Dynamic partition Overwrite
- Merge
The table ends up with the 2 incoming records. All existing partitions are deleted.
The table ends up with 7 records. Note that a new ìtem 1
record is added while the older one is kept.
The table ends up with 4 records. The second partition remains untouched while the first partition is erased and overwritten by with the incoming data.
The table ends up with 4 records. Incoming and existing records are added up but only the newest version of each product in the kept in the resulting table.
There is no good or bad strategy, the use of one of the strategies above depends on the use case. Some use case examples for each of the strategies are:
- Overwrite mode may be useful when you receive every day the list of all product names.
- Append mode may be useful when you receive daily sales.
- Dynamic Partition Overwrite mode may be useful when you ingested the first time a partition, and you need to ingest it again with a different set of data and thus alter only that partition.
- Merge mode may be useful when you receive product updates every day and that you need to keep only the last version of each product.
Spark How-to
Apache Spark SQL connector for Google BigQuery makes BigQuery a first class citizen as a source and sink for Spark jobs.
Append and Overwrite modes in Spark
BigQuery is supported by Spark as a source and sink through the Spark BigQuery connector
Spark comes out of the box with the ability to append or overwrite existing data using a predefined save mode:
val incomingDF = ... // Incoming data loaded with the correct schema
val bqTable = "project-id.dataset.table"
val saveMode = SaveMode.Overwrite // or SaveMode.Append fot he appending data
incomingDF.write
.mode(saveMode)
.partitionBy("date")
.format("com.google.cloud.spark.bigquery")
.option("table", bqTable)
.save()
Dynamic Partition Overwrite mode in Spark
To activate dynamic partitioning, you need to set the configuration below before saving the data using the exact same code above:
spark.conf.set("spark.sql.sources.partitionOverwriteMode","DYNAMIC")
Unfortunately, the BigQuery Spark connector does not support this feature (at the time of writing). We need to manually delete the partitions we want to overwrite first and then append the incoming data.
Assuming the table is partitioned by the field date
and the incoming data loaded in the incomingDF dataframe, the code below will
remove existing partitions that need to be overwritten.
val incomingDF = ... // Incoming data loaded with the correct schema
incomingDF
.select(date_format(col("date"), "yyyyMMdd").cast("string"))
.distinct()
.collect()
.map(_.getString(0))
.foreach { partition =>
bigQueryClient.deleteTable(TableId.of(datasetName, s"$table\$$partition"));
}
To drop a table partition using the Google Cloud bq
command line tool, you may use the following syntax:
bq rm -t 'project-id.dataset.table$YYYYMMDD'
We now need to append the incomingDF to mimic the dynamic partition overwrite
feature:
val incomingDF = ... // Incoming data loaded with the correct schema
val bqTable = "project-id.dataset.table"
val saveMode = SaveMode.Append
incomingDF.write
.mode(saveMode)
.partitionBy("date")
.format("com.google.cloud.spark.bigquery")
.option("table", bqTable)
.save()
The issue with this approach is that if the program crashes during the "appending" of the incoming data, partitions will have been deleted and data would be lost. However, you can still ingest the same file again in case of failure and the end result will be the same.
Dynamic Partition Merge in Spark
When you need to keep the last version of the record for each product, both BigQuery and Databricks (the company behind Spark in case you lived on the moon the last ten years) support the merge SQL statement:
MERGE INTO target_table
USING incoming_table
ON target_table.product = incoming_table.product
WHEN NOT MATCHED
THEN INSERT *
WHEN MATCHED AND incoming_table.date > target_table.date THEN
UPDATE SET *
/*
WHEN MATCHED AND incoming_table.timestamp <= target_table.timestamp THEN
SKIP
*/
Unfortunately the MERGE
statement is not supported by Apache Spark. It is only supported by Databricks, its commercial version.
To do a merge using the Spark BigQuery connector, we need to do it by following the steps below:
Step 1: Create a dataframe with all the rows
val allRowsDF =
incomingDF
.unionByName(existingDF)
Step 2: group by product and order each product occurrence by date descending
val orderingWindow =
Window
.partitionBy("product")
.orderBy(col("date").desc, col("product")))
val orderedDF =
allRowsDF
.withColumn("rownum", row_number.over(orderingWindow))
In the step 2 above, each product is ordered by date with the most recent one first (descending order).
We identify it by the rownum
column.
Step 3: Keep the most recent product
val toKeepDF =
orderedDF
.where(col("rownum") === 1)
.drop("rownum")
Step 4: Overwrite existing partitions with the data we want to keep
val bqTable = "project-id.dataset.table"
val saveMode = SaveMode.Overwrite
toKeepDF.write
.mode(saveMode)
.partitionBy("date")
.format("com.google.cloud.spark.bigquery")
.option("table", bqTable)
.save()
Starlake How-to
Starlake is a declarative Ingestion Framework based on YAML description files.
The 4 ingestion strategies described above are supported through the settings below:
- Overwrite
- Append
- Dynamic partition Overwrite
- Merge
name: "mydb"
directory: "..."
+ metadata:
schemas:
- name: "mytable"
pattern: "data-.*.csv"
metadata:
writeStrategy:
type: "OVERWRITE"
attributes:
- name: "date"
type: "date"
rename: "id"
- name: "product"
type: "string"
- name: "price"
type: "decimal"
name: "mydb"
directory: "..."
+ metadata:
schemas:
- name: "mytable"
pattern: "data-.*.csv"
metadata:
writeStrategy:
type: "APPEND"
attributes:
- name: "date"
type: "date"
rename: "id"
- name: "product"
type: "string"
- name: "price"
type: "decimal"
name: "mydb"
directory: "..."
+ metadata:
schemas:
- name: "mytable"
pattern: "data-.*.csv"
metadata:
strategy:
type: "OVERWRITE_BY_PARTITION"
sink:
partition:
- "date"
attributes:
- name: "date"
type: "date"
rename: "id"
- name: "product"
type: "string"
- name: "price"
type: "decimal"
name: "mydb"
directory: "..."
+ metadata:
schemas:
- name: "mytable"
pattern: "data-.*.csv"
metadata:
strategy:
type: "OVERWRITE_BY_KEY_AND_TIMESTAMP"
key: ["product"]
timestamp: "date"
attributes:
- name: "date"
type: "date"
- name: "product"
type: "string"
- name: "price"
type: "decimal"