loader

Slowly Changing Dimensions (SCD Type 2) with Delta and Databricks

From Warehouse to Lakehouse Pt.2

 

SCD Type 2 in SQL and Python

SCD Type 2 - 1.png

Introduction

For more information on this blog series and Slowly Changing Dimensions with Databricks and Delta Lakes check out SCD Type 1 from part 1 of the ‘From Warehouse to Lakehouse’ series:
https://headinthecloud.blog/2021/08/17/from-warehouse-to-lakehouse-slowly-changing-dimensions-scd-with-delta-and-sql/

All code examples are available in SQL and Python (PySpark) from my GitHub repo so you can follow along:
https://github.com/cwilliams87/Blog-SCDs

Notebook – ‘SQL\SCD-Type2 & Python\SCD-Type2

SCD Type 2 – Add a new row (with active row indicators or dates)

SCD Type 2 - 2.png

Type 2 SCD is probably one of the most common examples to easily preserve history in a dimension table and is commonly used throughout any Data Warehousing/Modelling architecture. Active rows can be indicated with a boolean flag or a start and end date. In this example from the table above, all active rows can be displayed simply by returning a query where the end date is null.

For example:

SELECT * FROM type2Table WHERE end_date IS NULL
 

Or in Python:

type2TableDF.where("end_date IS NULL")
 

In order to perform this type we need to add a number of columns to the existing table. Firstly a [start_date] and an [end_date] are required to act as active row indicators and a surrogate key denoted as [id]. This is due to the duplications that will occur in the [employee_id] column when record changes are added as new rows.

For more information on Type 2 SCD’s as proposed by the Kimball Group:

https://www.kimballgroup.com/data-warehouse-business-intelligence-resources/kimball-techniques/dimensional-modeling-techniques/type-2/

Scenario 2: Using the employee dataset (as seen in part 1). The employees Fred Flintoff and Hilary Casillis have changes to be made.
Fred needs a change of address whereas Hilary has recently got married and will be changing her [last_name].

New rows to MERGE into SCD Type 2 table

New rows to MERGE into SCD Type 2 table

We can once again perform this operation using the MERGE function in the previous example (Part 1), however as there are essentially two events happening with each row (amend existing and insert new version), we need to create a composite of the insertion table to highlight the two operations:

SQL

-- Example ChangeRows table
SELECT
null AS id, employee_id, first_name, last_name, gender, address_street, address_city, address_country, email, job_title, current_date AS start_date, null AS end_date
FROM scdType2NEW
UNION ALL
SELECT
id, employee_id, first_name, last_name, gender, address_street, address_city, address_country, email, job_title, start_date, end_date
FROM scdType2
WHERE employee_id IN
(SELECT employee_id FROM scdType2NEW)
 

Python

# Create list of selected employee_id's
empList = scd2Temp.select(
collect_list(scd2Temp['employee_id'])
).collect()[0][0]
 
# Select columns in new dataframe to merge
scd2Temp = scd2Temp.selectExpr(
"null AS id", "employee_id", "first_name", "last_name", "gender", "address_street", "address_city", "address_country", "email", "job_title", "current_date AS start_date", "null AS end_date"
)
# Union join queries to match incoming rows with existing
scd2Temp = scd2Temp.unionByName(
scdType2DF
.where(col("employee_id").isin(empList)), allowMissingColumns=True
)
 
# Preview results display(scd2Temp)
Query showing new records with their existing counterparts

Query showing new records with their existing counterparts

As you can see in the example above we have been able to achieve a UNION JOIN with the new rows to insert with their similar counterparts displaying the original records. Notice that the [id] rows are blank for these new records because this will be used to trigger the varying behaviours in the MERGE process. So if we put these points together with the MERGE operation…

SQL

-- Merge scdType2NEW dataset into existing
MERGE INTO scdType2
USING
 
-- Update table with rows to match (new and old referenced as seen in example above)
( SELECT
null AS id, employee_id, first_name, last_name, gender, address_street, address_city, address_country, email, job_title, current_date AS start_date, null AS end_date
FROM scdType2NEW
UNION ALL
SELECT
id, employee_id, first_name, last_name, gender, address_street, address_city, address_country, email, job_title, start_date, end_date
FROM scdType2
WHERE employee_id IN
(SELECT employee_id FROM scdType2NEW)
) scdChangeRows
 
-- based on the following column(s)
ON scdType2.id = scdChangeRows.id
 
-- if there is a match do this…
WHEN MATCHED THEN
UPDATE SET scdType2.end_date = current_date()
 
-- if there is no match insert new row WHEN NOT MATCHED THEN INSERT *
 
 

Python

# Convert table to Delta
deltaTable = DeltaTable.forName(spark, "scdType2")
 
# Merge Delta table with new dataset
(
deltaTable
.alias("original2")
# Merge using the following conditions
.merge(
scdChangeRows.alias("updates2"),
"original2.id = updates2.id"
)
 
# When matched UPDATE ALL values
.whenMatchedUpdate(
set={ "original2.end_date" : current_date() }
)
 
# When not matched INSERT ALL rows
.whenNotMatchedInsertAll()
# Execute
.execute()
)
 

As you can see in the code example above, if there is a match with [id] column from the scdChangeRows we can simply update the [end_date] with the current date, thus marking the row as expired. The new rows are then inserted from within the scdChangeRows table.

See Databricks notebook for further clarification.

What about the [id] column I hear you ask? Yes, you are correct to point out as using this method doesn’t add in new values for those additional rows. This is sadly due to Delta not currently (as of Aug 2021) supporting Auto Increment, a classic SQL column attribute which would be used to easily populate an ID for incoming rows. We can however, replace that functionality using a ROW_NUMBER window function which will add that sequential integer. We can however, for the purposes of this example, create a quick INSERT statement to reintroduce those rows with blank results. This sadly must be performed separately due to it not being supported from within the MERGE operation itself.

SQL

-- Order nulls in DF to the end and recreate row numbers (as delta does not currently support auto incrementals)
INSERT OVERWRITE scdType2
SELECT ROW_NUMBER() OVER (ORDER BY id NULLS LAST) - 1 AS id, employee_id, first_name, last_name, gender, address_street, address_city, address_country, email, job_title, start_date, end_date
FROM scdType2
 

Python

scdType2DF.selectExpr(
"ROW_NUMBER() OVER (ORDER BY id NULLS LAST) - 1 AS id",
"employee_id", "first_name", "last_name", "gender", "address_street",
"address_city", "address_country","email", "job_title", "start_date", "end_date" ).write.insertInto(tableName="scdType2", overwrite=True)
 
SCD Type 2 - 5.png

So there we are, in the final SELECT query shown above (limited to just show the affected 4 rows) we can see that there are new versions with a null [end_date] and changed amended values!

I hope this helps, next time we’ll look at an approach to perform SCD Type 3!

Please use the notebooks provided from my GitHub repo for a more in depth example.

author profile

Author

Chris Williams