Microsoft Fabric Updates Blog

Bridging Fabric Lakehouses: Delta Change Data Feed for Seamless ETL

Within Microsoft Fabric, Delta Tables serve as a common file/table format. These tables find application both in the Data Warehouse and as managed tables within the Lakehouse. Their versatility extends to several features and functionalities, making them indispensable in various use cases. One such feature is the Delta Change Data Feed. While not new to Microsoft Fabric, this article will demonstrate how to leverage the Delta Change Data Feed to facilitate seamless data synchronization across different lakehouses in your medallion architecture.

Whether you’re performing ETL or ELT operations using Spark or SQL, the Delta Change Data Feed ensures that changes propagate efficiently. As we explore the intricacies of this feature, remember that its impact extends beyond individual use cases—it empowers your entire data ecosystem. 

Delta Tables: A Brief Overview

Delta Tables, at their core, facilitate efficient data ingestion and real-time processing. Their optimized performance makes them an attractive choice for businesses seeking to maximize the value of their data.

Let’s explore some of the essential aspects of Delta Tables:

ACID Transactions: Delta Tables fully support ACID (Atomicity, Consistency, Isolation, Durability) transactions. This means that you can read from and write to Delta tables within the same transaction, ensuring data consistency. Concurrent modifications are handled gracefully, preventing conflicts.

Time Travel Capabilities: Delta Tables allow you to traverse time, enabling historical analysis. You can query data as it existed at specific points in time, making it easier to track changes and understand data evolution.

Integrated File Management: Managing files associated with data tables can be cumbersome. Delta Tables simplify this by seamlessly integrating file management. Whether you’re dealing with Amazon S3, Azure Blob Storage, or HDFS, Delta Tables streamline the process.

What is Change Data Feed?

Change data feed allows Spark computes to track row-level changes between versions of a Delta table. When enabled on a Delta table, the runtime records change events for all the data written into the table. This includes the row data along with metadata indicating whether the specified row was inserted, deleted, or updated.

You can read the change events in batch queries using Spark SQL, Apache Spark DataFrames, and Structured Streaming.

View further documentation here: Change data feed — Delta Lake Documentation

What are some use cases for Change Data Feed?

You can review the full documentation here Use Delta Lake change data feed on Azure Databricks – Azure Databricks | Microsoft Learn. The documentation specifies the use on Azure Databricks, but documentation is applicable to Microsoft Fabric as well.

  • Silver and Gold tables: Improve Delta Lake performance by processing only row-level changes following initial MERGEUPDATE, or DELETE operations to accelerate and simplify ETL and ELT operations.
  • Materialized views: Create up-to-date, aggregated views of information for use in BI and analytics without having to reprocess the full underlying tables, instead updating only where changes have come through.
  • Transmit changes: Send a change data feed to downstream systems such as Kafka or RDBMS that can use it to incrementally process in later stages of data pipelines.
  • Audit trail table: Capture the change data feed as a Delta table provides perpetual storage and efficient query capability to see all changes over time, including when deletes occur and what updates were made.

ETL/ELT Scenario

We are going to walk through a common ETL/ELT pattern within a medallion architecture. The following scenario will walkthrough enabling change data feed on a delta table and simulate raw data landing in storage -> loading into a silver Lakehouse table -> updating a gold Lakehouse table. The data will be centered around hospital vaccination metrics and goal will be to update a table in the silver layer Lakehouse and have those changes captured and fed to the gold Lakehouse table using the change data feed.

If you are unfamiliar with medallion architecture or would like a refresher then refer to this documentation. Implement medallion lakehouse architecture in Microsoft Fabric – Microsoft Fabric | Microsoft Learn

Prerequisites:

  • Fabric capacity (free trial or paid version)
  • 2 different Fabric Lakehouses. The demo will have the names ‘SilverLakehouse’ and ‘GoldLakehouse’

The scenario will involve the below assets:

  • 1 Juptyer Notebook to execute our code
  • 2 different Fabric Lakehouses named: ‘SilverLakehouse’ and ‘GoldLakehouse’
  • 2 Tables named: ‘Silver_HospitalVaccination’ and ‘Gold_HospitalVaccination’

Scenario Set Up

In your notebook, set your default Lakehouse to the ‘SilverLakehouse’.

In the below image:

  • Red: Your data sources. Can add new ones or view existing as well as change your default data source.
  • Green: Current data source. Can be different from the default.
  • Black: Shows your current workspace that the notebook is running in.
  • Blue: Shows your connected data sources.
  • Yellow: Shows the default data source, shown as a pin icon next to the data source.

Scenario Walkthrough

For simplicity, we will only be working with 2 Lakehouses, and simulating the bronze layer. The below code will load our raw data into a Delta Table, ‘Silver_HospitalVaccination’, in the ‘SilverLakehouse’ (our default Lakehouse).


Hospitals = [("Contoso_SouthEast", 10000, 20000), ("Contoso_NorthEast", 1000, 1500), ("Contoso_West", 7000, 10000), ("Contoso_North", 500, 700) ]
columns = ["Hospital","NumVaccinated","AvailableDoses", ]
spark.createDataFrame(data=Hospitals, schema = columns).write.format("delta").mode("overwrite").saveAsTable("Silver_HospitalVaccination")

Let’s view our silver table with SQL with the below code.

%%sql
SELECT * FROM SilverLakehouse.Silver_HospitalVaccination
Viewing the SilverLakehouse to verify our new table

Next, we will create/overwrite our new table in the ‘GoldLakehouse’ so we can have a baseline for the data to start capturing our changes. We are also going to make this gold layer table more “business ready”:

  • Dropping the columns ‘NumVaccinated‘ and ‘AvailableDoses
  • Adding the columns ‘VaccinationRate‘ and ‘DeletedFlag
    • VaccinationRate‘ is a calculation of ‘NumVaccinated‘/’AvailableDoses
    • DeletedFlag‘ is to identify if rows have been deleted from a previous layer.

One change from the previous syntax used, is demonstrating that we can use the abfss file location to create/refer to a delta table. Either syntax will work, and further demonstrates the capability of Fabric and OneLake.

#Create/overwrite a table in a different lakehouse. This time we use the abfss file path instead of the shorthand version that will be used later to create/overwrite the delta table in a different lakehouse (our gold lakehouse)

import pyspark.sql.functions as F
spark.read.format("delta").table("SilverLakehouse.Silver_HospitalVaccination").withColumn("VaccinationRate", F.col("NumVaccinated") / F.col("AvailableDoses")).withColumn("DeletedFlag", F.lit("N")) \
  .drop("NumVaccinated").drop("AvailableDoses") \
  .write.format("delta").mode("overwrite").save("abfss://FabricDemo@onelake.dfs.fabric.microsoft.com/GoldLakehouse.Lakehouse/Tables/Gold_HospitalVaccination")
%%sql
SELECT * FROM GoldLakehouse.Gold_HospitalVaccination

Let’s view the new gold table within the ‘GoldLakehouse’

Now that the tables have been created and they are in sync, let’s enable the Change Data Feed on the Silver Lakehouse table to start capturing our changes.

%%sql
ALTER TABLE SilverLakehouse.Silver_HospitalVaccination SET TBLPROPERTIES (delta.enableChangeDataFeed = true)

Or we could have enabled this setting when creating the table like below.

CREATE TABLE myNewTable (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)

With Change Data Feed enabled on our table, we can start to track our changes. So, we will make 3 changes to the ‘SilverLakehouse’ table (1 update, 1 delete, and 1 insert). The goal is to identify what has changed in the Silver table and be able to replicate those changes in the ‘GoldLakehouse’ version.

%%sql
-- Update the silver lakehouse
UPDATE SilverLakehouse.Silver_HospitalVaccination SET NumVaccinated = '11000' WHERE Hospital = 'Contoso_SouthEast'
%%sql
-- delete a record
DELETE from SilverLakehouse.Silver_HospitalVaccination WHERE Hospital = 'Contoso_NorthEast'
%%sql
-- Insert a record
INSERT INTO SilverLakehouse.Silver_HospitalVaccination VALUES ('Contoso_East', 500, 1500)

Now we will view our changes in the Silver Lakehouse table.

%%sql
-- View the changes in the SQL table
SELECT * FROM SilverLakehouse.Silver_HospitalVaccination

This is great that we can see the changes in the SQL table, however how can you view the Change Data Feed data to see exactly what happened? We can use the DESCRIBE function.

In the results image, there is a good number of metadata columns associated with the changes but for simplicity, we will focus on the column’s ‘version’, ‘operation’, and ‘operationParameters’.

An important row/version number is when the table was enabled with Change Data Feed, as we will only be able to use the Change Data Feed functionality from this version forward. You can see in the results that version = 1 when we altered the table properties to enable this. This will be important later on as you will see this value used in the SQL or Python code.

%%sql 
-- view the changes using describe in SQL
describe history Silver_HospitalVaccination;

This information is useful, but to obtain a more user-friendly and informative query result that clearly indicates the changed values, the different versions of those changes, or even a specific version or commit, we can utilize the ‘table_changes’ table-valued function.

For more information on this table-valued function: table_changes table-valued function – Azure Databricks – Databricks SQL | Microsoft Learn

%%sql
-- View a specific timestamp/commit in SQL
SELECT * FROM table_changes('SilverLakehouse.Silver_HospitalVaccination', 1) order by _commit_timestamp DESC;
In this image we can see the before and after values from our 3 changes that occurred in our previous work

You can also view this same data using Python. Note the starting version is important here as well.

#Let's view the change data using PySpark

changes_df = spark.read.format("delta").option("readChangeData", True).option("startingVersion", 1).table('SilverLakehouse.Silver_HospitalVaccination')
display(changes_df)
Same results as our SQL cell

This information is extremely valuable, so how can we integrate it into our ETL/automated processes? We will establish a temporary view. Refer to the code for inline comments regarding the view.

This can be parameterized for more advanced uses to where you can only grab the newest changes instead of all the changes but that is more advanced and won’t be covered here.

%%sql
-- Collect only the latest version for each Hospital by using a view to set up for our merge statement
CREATE OR REPLACE TEMPORARY VIEW Silver_HospitalVaccination_latest_version as
SELECT * 
    FROM 
         (SELECT *, rank() over (partition by Hospital order by _commit_version desc) as rank
          FROM table_changes('SilverLakehouse.Silver_HospitalVaccination', 1)
          WHERE _change_type !='update_preimage')--filters the 'before' values so we can only grab the updated values and not the old values
    WHERE rank=1 --if multiple changes occurred during a single commit then we get the most recent version of that

Let’s view the output of this view, it will look extremely similar to our other table_changes query.

Creating a view simplifies and encapsulates the logic, making the joins and utilization of this data easier.

%%sql
SELECT * FROM Silver_HospitalVaccination_latest_version

Now that we have set up the Change Data Feed and associated views to continuously identify what has changed at the silver layer, we will build in our code to promote these changes to the next environment, our ‘GoldLakehouse’ table.

To accomplish this, we will be using a SQL MERGE statement. We will execute this from the ‘SilverLakehouse’ and write to our ‘GoldLakehouse’. Further demonstrating the cross Lakehouse ability in Fabric.

The goal of this MERGE statement is to (review inline comments for further explanation):

  1. Insert any new rows to the GoldLakehouse table.
  2. Update an existing row based on the join criteria and values matching.
  3. Update our ‘DeletedFlag’ column for rows that have been deleted.

There are multiple methods to manage changes, and each organization or data model has unique requirements. Whether there’s a need to entirely overwrite values without retaining history, establish a type-2 slowly changing dimension, or integrate various needs, the Change Data Feed can serve as the foundation for your process.

%%sql
-- Merge the changes to gold table, across lakehouses. Using the Silver Lakehouse as the default lakehouse
MERGE INTO GoldLakehouse.Gold_HospitalVaccination t USING Silver_HospitalVaccination_latest_version s ON s.Hospital = t.Hospital --Joining our Gold Lakehouse to our view above on the Hospital
        WHEN MATCHED AND s._change_type='update_postimage' THEN UPDATE SET VaccinationRate = s.NumVaccinated/s.AvailableDoses -- When an update occurs, perform the neccessary calculations/transformation into our Gold table
        WHEN MATCHED AND s._change_type='delete' THEN UPDATE SET DeletedFlag = 'Y' -- If a hospital is deleted then we want to update the deleted flag to 'Y' and preserve the row
        WHEN NOT MATCHED THEN INSERT (Hospital, VaccinationRate, DeletedFlag) VALUES (s.Hospital, s.NumVaccinated/s.AvailableDoses, 'N')-- Insert new hospitals with our transformation logic or default values
Output provides a summary of the affected rows for inserted, updated, and deleted.

Now that our changes have been promoted to our ‘GoldLakehouse’, let’s view those new changes in our ‘Gold_HospitalVaccination’ table and compare to our starting values.

%%sql
SELECT * FROM GoldLakehouse.Gold_HospitalVaccination 

Our original Gold table.

Our updated Gold table with the changes highlighted.

Conclusion

Microsoft Fabric is equipped with powerful functionality to ensure a seamless ETL experience tailored to your requirements. In this example, we demonstrated how to utilize the delta change data feed to effortlessly transfer and transform your data across various Lakehouses. This is a basic illustration, which you can expand upon by incorporating parameterization, intricate logic for transformation/modeling purposes, or integrating this process into your comprehensive ETL/ELT workflow alongside other Fabric components, such as Fabric Pipelines, Power BI reports, and more.

Bài đăng blog có liên quan

Bridging Fabric Lakehouses: Delta Change Data Feed for Seamless ETL

tháng 10 29, 2024 của Dandan Zhang

Managed private endpoints allow Fabric experiences to securely access data sources without exposing them to the public network or requiring complex network configurations. We announced General Availability for Managed Private Endpoint in Fabric in May of this year. Learn more here: Announcing General Availability of Fabric Private Links, Trusted Workspace Access, and Managed Private Endpoints. … Continue reading “APIs for Managed Private Endpoint are now available”

tháng 10 28, 2024 của Estera Kot

We’re thrilled to announce that the Native Execution Engine is now available at no additional cost, unlocking next-level performance and efficiency for your workloads. What’s New?  The Native Execution Engine now supports Fabric Runtime 1.3, which includes Apache Spark 3.5 and Delta Lake 3.2. This upgrade enhances Microsoft Fabric’s Data Engineering and Data Science workflows, … Continue reading “Native Execution Engine available at no additional cost!”