Microsoft Fabric Updates Blog

Data Pipeline Performance Improvements Part 2: Creating an Array of JSONs

Welcome back to Part 2 of this 3-part series on optimizing Data Pipelines for historical loads. In the first two parts, we are introducing two technical patterns. Then in Part 3, we will bring everything together, covering an end-to-end design pattern.

To recap, in Part 1 we covered how to parse a time interval (dd.hh:mm:ss) into seconds. This introduced the Expression Builder and how to leverage built-in functions.

Today in Part 2, I will cover two examples:

  1. Merging two Arrays into an Array of JSON Objects
  2. How to take a date range and create multiple sub-ranges then store these as an Array of JSONs

Example 1: Merging two Arrays into an Array of JSON Objects

Scenario

In Data pipelines, we may have two arrays that need to be merged into a nested array. A great method for this is to create an array of JSON objects. By iterating over the array using a ForEach Activity, each item() is the JSON object and the fields within the object can be referenced by using the dot (.) notation.

In this tutorial, we will go through the process of creating an array of JSON objects.

For this example, we have two Arrays* passed in as parameters into our Data pipeline:

FirstNames = [“Sean”,”John”,”Henry”,”Kevin”]

LastNames = [“Mira”,”Smith”,”Pitt”,”Lee”]

*Assume the length of the arrays will always match.

Before we create the array of JSON objects, let’s create a FinalArray that merges the two arrays together:

FinalArray = [“Sean Mira”, “John Smith”, “Henry Pitt”, “Kevin Lee”]

A screenshot of a computer

Description automatically generated

Let’s create a variable of type array, called Range. We will dynamically create this range based on the length of one of our input arrays. By doing this, we can leverage a ForEach activity to iterate over this array of integers, where each integer is an index of our source array.

A screenshot of a computer

Description automatically generated

The expression for our variable, Range, is:

@range(0,length(pipeline().parameters.FirstName))

Next, we will create the ForEach activity. Since we will be referencing the source array values by index, we can disable the Sequential box and input a Batch count. This will allow for the inner activities to run in parallel, drastically improving performance.

Remember, by default, Batch count is 20 and the upper limit is 50.

A screenshot of a computer

Description automatically generated

Inside the ForEach activity, we will use the Append Array activity to insert items into the variable TempArray. We are using TempArray as a place holder. Later, we will set FinalArray = TempArray, so that it’s easier for us to visualize the final output. You do not need to do this and can leverage FinalArray within the ForEach; however, it might be easier during development when debugging to use this trick.

A screenshot of a computer

Description automatically generated

Value is set to:

@concat(pipeline().parameters.FirstName[item()], ' ', pipeline().parameters.LastName[item()])

In this expression, we can see that we are concatenating FirstName[] with LastName[] and leveraging the item() as the index.


Next, we will set FinalArray to TempArray for easy visibility into all the values that we appended.
A screenshot of a computer

Description automatically generated

When we run this Data pipeline, we can easily see that we successfully merged these two source Arrays.

A screenshot of a computer

Description automatically generated

However, our scenario wants the FinalArray to be an Array of JSON objects, with the fields, FirstName and LastName.

Desired Final Array:

[{“FirstName": “sean”, “LastName”: “Mira"},
{“FirstName": “Kevin”, “LastName”: “Lee "},
{“FirstName": “henry”, “LastName”: “Pitt "},
{“FirstName": “john”, “LastName”: “Smith "}]

To do this, we are going to leverage the JSON() function within the expression builder and enhance the values within the concat() function to include the JSON schema:

@json(
concat('{"FirstName":"',
pipeline().parameters.FirstName[item()],
'", "LastName":"',
pipeline().parameters.LastName[item()]
,'"}')
)

Now that we have updated the expression, our FinalArray is now an Array of JSON objects.
A screenshot of a computer

Description automatically generated

To leverage this, we can iterate over each name and invoke a child pipeline that has two parameters: FirstName and LastName. We would create another ForEach activity that has an inner Invoke Pipeline activity. Since item() is a JSON object, we can use dot (.) notation to reference each Key.

A screenshot of a computer

Description automatically generated

Example 2: Take a date range and create multiple sub-ranges to store as an Array of JSONs

For context, in some historical load scenarios, we specify a date range to indicate how far back of historical data that we’d want to bring into our analytical system. In Data pipelines, a single Copy activity can achieve great performance for these historical loads. However, there are scenarios where we need to ramp up performance to meet strict requirements. A design pattern to achieve optimal performance is to logically partition the historical range into smaller ranges and then pass in each smaller range into a Copy activity, allowing multiple processes to run in parallel. By creating the Array of JSON objects, you can loop through the array with a ForEach activity. Within this ForEach activity, we can store our data movement logic and use Dynamic Content to reference specific values that we are iterating against.

Please remember that it is important to understand the limitations of your source and destination systems and design your solution with them in mind.

Parameters

  • startIndex – start datetime
  • endIndex – end datetime
  • interval – time interval offset
  • BucketIntervalInDays – days between each startindex and endindex

Variables:

Data Pipeline

A screenshot of a computer

Description automatically generated

Summary of Data Pipeline Activities

Parse Interval: Coverd in Part 1 of this series – using the Expression Language to parse and convert the time interval offset from dd.HH:mm:ss into seconds (ss)

Set TickInterval: to convert the parameter: BucketIntervalInDays into ticks (days -> ticks)

Set GetDateDiff: Uses the function ticks() to ccalculate the number of sub ranges we will create. If the number of ranges is not evenly distributed, we will create an additional range for the remainder

Ticks(): Return the ticks property value for a specified timestamp. A tick is a 100-nanosecond interval.

Set Range: Uses the range() function to create an Array ranging from 0 to the number of sub ranges needed.

Range(): Generates an array of integers starting from a certain number, and you define the length of the returned array

Create Array of JSONs: ForEach Activity that iterates over the variable Range

Append New JSON: Creates a new JSON object and appends this object to the variable Boundaries

Set List of Date Ranges: Purely a visual/monitoring activity used to see a consolidated view of the array we appended to.

Set TickInterval

In this Data pipeline, we are allowing the end-user to specify the number of days each sub-range span. We need to convert the parameter BucketIntervalInDays of type integer into something that we can compare with a datetime value. To do this we can use the mul() function to multiply BucketIntervalInDays by the number of ticks (100-nanoseconds) within a day (864,000,000,000) and set this value to the variable TickInterval.

Expression
@string(
mul(
int(pipeline().parameters.BucketIntervalInDays),
864000000000
)
)

Set GetDateDiff

If you are provided two date or datetime values and you want to calculate the date difference between them, you can use the functions sub() and ticks() to subtract the ticks of the larger date by the ticks of the smaller date.

sub(
ticks(pipeline().parameters.endIndex),
ticks(pipeline().parameters.startIndex)
)

In our scenario we are trying to determine the best number of sub-ranges to create given the parameter BucketIntervalInDays. First, we need to determine if this value can fit evenly, otherwise we will need to add an additional range to hold the remainder. To determine if two values are evenly divisible by one another, we can use the function mod() to take the modulus. If the result of mod() is anything other than 0 then we will need an additional sub-range by using the function add() to add 1 to the value.

Mod(): Return the remainder from dividing two numbers.

Pseudo Code
If the modulus of (EndIndex – StartIndex) = 0 
then divide (EndIndex – StartIndex) by TickInterval 
else divide (EndIndex – StartIndex) by TickInterval and add 1.
Expression
@if(
equals(
mod(
sub(
ticks(pipeline().parameters.endIndex),
ticks(pipeline().parameters.startIndex)
),
int(variables('TickInterval'))
),
0
),
string(div(
sub(
ticks(pipeline().parameters.endIndex),
ticks(pipeline().parameters.startIndex)
),
int(variables('TickInterval'))
)
),
string(add(
div(
sub(
ticks(pipeline().parameters.endIndex),
ticks(pipeline().parameters.startIndex)
),
int(variables('TickInterval'))
),
1
)
)
)

Set Range

We want to create an Array with the number of indexes matching the number of sub-ranges. This will allow for us to use a ForEach activity with an inner Append Variable activity. Using the function range(), starting at 0 and ending with the value of our variable GetDateDiff, we can create such an array.

Range(): Return an integer array that starts from a specified integer.

Expression
@range(0,int(variables('GetDateDiff')))

Create Array of JSONs: ForEach activity

Now what we have an array with the number of indexes matching the number of sub-ranges we want, we can get into the real meat of things.

SPECIAL NOTE: If the order does not matter, to gain a huge boost in performance, disable the Sequential box and set Batch Count = 50 (default = 20, max = 50). We’ll cover this in more detail in Part 3 😉

Append New JSON

The moment we have all been waiting for! If you have made it this far, thank you for your patience as there was a lot of build up to this moment.

To create an Array of JSON objects, we want to create an empty array variable then use the Append Variable activity. To append a JSON object to this array, we need to leverage two functions, json() and concat().

JSON(): Return the JavaScript Object Notation (JSON) type value or object for a string or XML.

Concat(): Combine two or more strings, and return the combined string.

The main idea here is to concatenate your key:value pairs together then cast this string as a JSON. The string we create needs to follow correct JSON formatting, starting with ‘{‘ and ending with ‘}’, using wrapping inner string values in double quotes.

For example, to create an Array with the JSON object, {“FirstName”:”Sean”} we can use the follow expression within an Append Variable activity:

@json(
concat('{"firstName": "sean"}')
)

In our scenario, the expression is much more complex than this example. To best explain what is happening, let’s look at some pseudo code:

Pseudo(ish) Code
If we are on the first sub-range (Range[i] = 0), then we need to set
StartIndex = the parameter StartIndex
EndIndex =
If StartIndex + (Range[i] * BucketIntervalInDays) < EndIndex
then EndIndex
else EndIndex = StartIndex + ((Range[i] + 1) * BucketIntervalInDays)
Else
StartIndex = ((Range[i] * BucketIntervalInDays) + parameter StartIndex) + Interval Off Set
EndIndex =
If (StartIndex + (Range[i] + 1) * BucketIntervalInDays) <= parameter EndIndex
Then parameter EndIndex
Else ((Range[i] + 1) * BucketIntervalInDays) + parameter StartIndex
Expression
@if(
equals(item(),0)
,
json(concat('{"startIndex":"',string(adddays(pipeline().parameters.startIndex,mul(item(),pipeline().parameters.BucketIntervalInDays))), '","endIndex":"',
if(
greaterOrEquals(
adddays(pipeline().parameters.startIndex,mul(add(item(),1),pipeline().parameters.BucketIntervalInDays)),
pipeline().parameters.endIndex
),
pipeline().parameters.endIndex,
string(adddays(pipeline().parameters.startIndex,mul(add(item(),1),pipeline().parameters.BucketIntervalInDays)))
)
,'"}')),
json(concat('{"startIndex":"',string(addseconds(adddays(pipeline().parameters.startIndex,mul(item(),pipeline().parameters.BucketIntervalInDays)),int(variables('IntervalDatePart')))), '","endIndex":"',
if(
greaterOrEquals(
adddays(pipeline().parameters.startIndex,mul(add(item(),1),pipeline().parameters.BucketIntervalInDays)),
pipeline().parameters.endIndex
),
pipeline().parameters.endIndex,
string(adddays(pipeline().parameters.startIndex,mul(add(item(),1),pipeline().parameters.BucketIntervalInDays)))
)
,'"}'))
)

Set List of Date Ranges

For monitoring and debugging, when using Append Variable, I tend to follow up with a Set Variable activity and set an empty Array to the value of the Array I appended to, just for an easy visual.

A screenshot of a computer

Description automatically generated

Conclusion

I have used this design in numerous production pipeline processes. In the next blog, we will compare and contrast the performance between using and not using sub-ranges by evaluating two examples: one retrieving data from an API and another retrieving data from an Azure SQL Server Database.

With a little bit of creativity, Data pipelines can solve a robust set of scenarios. I hope this blog helps spark a bit of creativity when you’re faced with your next challenge.

Thank you for reading! As always, please feel free to comment or ask questions. I’m always open to recommendations for additional blogs if you have Data pipeline topics that you would like to see covered in depth.

Entradas de blog relacionadas

Data Pipeline Performance Improvements Part 2: Creating an Array of JSONs

octubre 29, 2024 por Leo Li

We’re excited to announce several powerful updates to the Virtual Network (VNET) Data Gateway, designed to further enhance performance and improve the overall user experience. These new features allow users to better manage increasing workloads, perform complex data transformations, and simplify log management. Expanded Cluster Size from 5 to 7 One of the key improvements … Continue reading “New Features and Enhancements for Virtual Network Data Gateway”

octubre 28, 2024 por Estera Kot

We’re thrilled to announce that the Native Execution Engine is now available at no additional cost, unlocking next-level performance and efficiency for your workloads. What’s New?  The Native Execution Engine now supports Fabric Runtime 1.3, which includes Apache Spark 3.5 and Delta Lake 3.2. This upgrade enhances Microsoft Fabric’s Data Engineering and Data Science workflows, … Continue reading “Native Execution Engine available at no additional cost!”