Microsoft Fabric Updates Blog

Data Pipeline Performance Improvements Part 1: How to convert a time interval (dd.hh:mm:ss) into seconds

Series Overview

Welcome to this short series where we’ll be discussing the technical methods used to improve Data Pipeline Copy activity performance through parallelization by logically partitioning any source.

Often, we see solutions leveraging a single Copy Activity to move large volumes of data. While this works great, you might face a scenario where you need to improve the performance by reducing the time it takes to move data into your Fabric Lakehouse.

To improve performance, instead of using a single Copy Activity to move a large volume of data, we can have multiple Copy Activities moving smaller volumes in parallel. It doesn’t matter if the source is a REST API, Blob Storage, or a Transactional Database. In many cases, we can logically partition the source data into buckets then copy each bucket over to the destination.

An important tip to remember is for you, as the engineer, to take the time and understand your source and destination. You should know the maximum number of concurrent connections and other factors, then design your solution to utilize them to your advantage.

In this series, I will extend upon my previous blog post on ingesting files into a Lakehouse from a REST API with pagination ft. AVEVA Data Hub. There, we covered how to move parquet files from a REST API to Microsoft Fabric Lakehouse in a semi-single threaded method given these parameters:

  • StartIndex – start datetime
  • EndIndex – end datetime
  • Interval – Granularity of data, if interval is set to 1 minute, if provided a Start and End index for a 1-hour span, we will get back 60 records
  • Pagecount – number of records per file (limited by the source)

In the current design, the pipeline will take a considerable amount of time using a single Copy Activity if we provide a StartIndex and EndIndex spanning multiple years with a small Interval and Pagecount.

We can improve performance by creating many sub-time ranges based on the StartIndex, EndIndex, and Interval, then for each sub-time range, we call a child pipeline containing a Copy Activity, allowing multiple Copy Activities to execute at the same time, all handling a subset of data. This method can also be extended to any scenario when provided with some boundary condition. To provide some context, I’ve used this on a large SQL Table that had a datetime column by taking the Min and Max date then creating sub-time ranges. Using a ForEach Activity (Sequential = False, Batch Count = 50) and iterating over each range, I was able to execute many copy activities in parallel, taking processing time down from 6.5 hours to under 8 minutes.

The first two parts of this series are designed to ease you into two technical processes while the third and final part will bring everything together. By the end of this series, we will cover all of the tips and tricks needed to achieve these performance gains.

Part 1: How to convert a time interval (dd.hh:mm:ss) into seconds

Scenario

We have a Data Pipeline that has an Interval parameter (string) being passed in. This Interval is used to determine a future date and comes into the pipeline as dd.hh:mm:ss (e.g. 01.12:05:02). For this example, we need to add 1 day, 12 hours, 5 minutes, and 2 seconds to some date value we have.

Within the Data Pipeline Expression Builder, we have access to

  • addToTime()
  • adddays()
  • addseconds()
  • addminutes()
  • getFutureDate()

Because our smallest increment is Seconds, we will be leveraging the addseconds() function.

The parameter Interval is of type String, allowing us to parse the value into segments. We want to isolate each segment (days, hours, minutes, seconds), convert to seconds, then aggregate out results that we can leverage within the addseconds() function.

Using the split() function, we can split the string into an array given some delimiter.

For 01.12:05:02, if we use split(‘01.12:05:02’, ‘:’), our result is going to be an array: [01.12, 05, 02]

Notice that the Day and Hour parts both fell into the same index because the delimiter differs.

To capture only the Day part, we can nest the original split function inside another split function and reference a specific index of the array returned:

split(split(pipeline().parameters.interval, ‘:’)[0], ‘.’)

Returning only the Day part: 01.

Another challenge is the leading 0 as we will need the values to be of type integers.

Using the function, startswith() we can nest this logic inside an if() condition function that will check if the value starts with 0. If it does, then we will leverage the substring() function to return only the second index (non-zero) of that string.

@if(

startswith(

split(split(pipeline().parameters.interval, ':')[0], '.')[0]

, '0')

, substring(

split(split(pipeline().parameters.interval, ':')[0], '.')[0]

, 1

, 1

),

split(split(pipeline().parameters.interval, ':')[0], '.')[0]

)

Given: pipeline().parameters.interval = 01.12:05:02

The Output: 1

Now that we know how to parse the Interval into the respective parts, we want to take each part and multiple by the number of seconds within each.

1 Day = 86400 Seconds

1 Hour = 3600 Seconds

1 Minute = 60 Seconds

1 Second = 1 Second

We can use the mul() function to multiply the output to the number of seconds. Mul() requires numeric values as arguments, so we need to cast the result of our existing expression to an integer using the int() function.

@mul(int(if(

startswith(

split(split(pipeline().parameters.interval, ':')[0], '.')[0]

, '0')

, substring(

split(split(pipeline().parameters.interval, ':')[0], '.')[0]

, 1

, 1

),

split(split(pipeline().parameters.interval, ':')[0], '.')[0]

)),86400)

Given: pipeline().parameters.interval = 01.12:05:02

The Output: 86400

Next, we need to aggregate all these parts into a single number. To do this we can use the add() function. This additive function can only accept two arguments, so again we will need to nest this function into itself.

add(dd,

add(hh,

add(mm,ss)

 )

)

 Bringing everything together, our final expression is:

@string(add(mul(int(if(

startswith(

split(split(pipeline().parameters.interval, ':')[0], '.')[0]

, '0')

, substring(

split(split(pipeline().parameters.interval, ':')[0], '.')[0]

, 1

, 1

),

split(split(pipeline().parameters.interval, ':')[0], '.')[0]

)),86400)

,add(mul(int(if(

startswith(

split(split(pipeline().parameters.interval, ':')[0], '.')[1]

, '0')

, substring(

split(split(pipeline().parameters.interval, ':')[0], '.')[1]

, 1

, 1

),

split(split(pipeline().parameters.interval, ':')[0], '.')[1]

)), 3600)

,add(mul(int(if(

startswith(

split(pipeline().parameters.interval, ':')[1]

, '0')

, substring(

split(pipeline().parameters.interval, ':')[1]

, 1

, 1

),

split(pipeline().parameters.interval, ':')[1]

)), 60)

,int(if(

startswith(

split(pipeline().parameters.interval, ':')[2]

, '0')

, substring(

split(pipeline().parameters.interval, ':')[2]

, 1

, 1

),

split(pipeline().parameters.interval, ':')[2]

))

))))

Given: pipeline().parameters.interval = 01.12:05:02

The Output: 129902

Example Run

In this example, we are converting the Interval parameter into seconds and using addSeconds() function to calculate a new date.

Set Variable Activity: Update StartIndex

@string(addSeconds(pipeline().parameters.StartIndex, int(variables(‘ParseInterval’))))

A screenshot of a computer

Description automatically generated

This concludes Part 1 of our series. In Part 2, we will go over how to create an Array of JSON objects. Then finally, in Part 3, we will tie everything together into a cohesive end-to-end solution.

As always, please feel free to leave questions and comments below. If there is a particular Data Pipeline scenario that you would like to see covered, please let me know in the comments. Thank you for reading!

Related blog posts

Data Pipeline Performance Improvements Part 1: How to convert a time interval (dd.hh:mm:ss) into seconds

April 9, 2024 by Meenal Srivastva

Create data pipelines in Fabric to access your firewall-enabled ADLS Gen2 storage accounts with ease and security. We are excited to announce a new feature in Fabric that enables you to create data pipelines to access your firewall-enabled Azure Data Lake Storage Gen2 (ADLS Gen2) accounts. This feature leverages the workspace identity to establish a … Continue reading “Introducing Trusted Workspace Access in Fabric Data Pipelines”

April 4, 2024 by Mark Kromer

We are always working on making the experience of building low-code data pipelines as easy as possible for our customers. Next week, we are going to roll-out a new experience in the Script activity in Fabric Data Factory pipelines to make it even easier to build expressions using the pipeline expression language.