Microsoft Fabric Updates Blog

Fabric Change the Game: Real – time Analytics

As we delve into this comprehensive series on Fabric, one of its pivotal facets we explore is Real-Time Analytics. Within this subject, Real-Time Analytics serves as a formidable tool, diminishing complexity and streamlining data integration processes. Imagine scenarios where a continuous stream of data unfolds, capturing the intricacies of messages posted on a micro-blogging site or chronicling environmental measurements relayed by an internet-connected weather sensor. The essence of streaming data analytics lies in its ability to illuminate the nuances of change over time.

Microsoft Fabric allows you to build Real-Time streaming analytics with eventstream or Spark Stream. You can find more information here about Spark Stream: Streaming data into lakehouse – Microsoft Fabric | Microsoft Learn and here for eventstream:Create and manage an eventstream in Microsoft Fabric – Microsoft Fabric | Microsoft Learn

This post is based on an experience made by https://www.linkedin.com/in/anshuldsharma/ and published here: anshulsharmas/fabric-iss-demo: Monitoring International Space Station with Microsoft Fabric (github.com) , shared with me by my colleague Luca Ferrari.

The GitHub author made an experience tracking the International Space Station (ISS) using the API Open Notify — API Documentation (open-notify.org) which gives the current position of the ISS. Using Logic Apps, he pulls this information sent to eventstream and from there to a Kusto Database where he queries.

Using the same idea, the plan is to show this has been fetched from Spark Notebook inside of Fabric and then sent to eventstream.

You will need also to reference this doc: Stream real-time events from a custom app to a Microsoft Fabric KQL database – Microsoft Fabric | Microsoft Learn

Steps by Step

Inside of the Real-time Analytics options in Fabric

  1. Real-Time Analytics – Create your KQL Database and eventstream as mentioned in the doc above and Fig 1 – KQL and Fig 2 – Eventstream show:
Fig 1 – KQL
Fig 2 – Eventstream

Inside of the eventstream in Fabric

2. Eventstream – Open eventstream and define your source as a custom app, as Fig 3 – Custom app, show:

Fig 3 – Custom App

3. Copy the primary key in the notepad and keep it there, we will use the Python script to connect to the eventstream and send the events, as can be seen from Fig 4 – Key:

Fig 4 -Key

Inside of the Notebook in Fabric

4. Notebook – The Python script will need some libraries to be pip-installed at the lakehouse. More information on how to use notebooks in Fabric is here: https://learn.microsoft.com/en-us/fabric/data-engineering/lakehouse-notebook-explore:

pip install azure-eventhub
pip install apscheduler

Follow more information about the libraries:

APScheduler · PyPI – This one will be used as an example to send the events in a time period. You do not need this one if you configure it in a different way to keep sending the events. I am using it for the purpose of this demo – as the library described itself – “Python library that lets you schedule your Python code to be executed later, either just once or periodically. You can add new jobs or remove old ones on the fly as you please.”

azure-eventhub · PyPIAzure Event Hubs is a highly scalable publish-subscribe service that can ingest millions of events per second and stream them to multiple consumers. This lets you process and analyze the massive amounts of data produced by your connected devices and applications.

5. Let’s see how we can get the API request. Follow the example with the request library:

import requests

response = requests.get("http://api.open-notify.org/iss-now.json")

# Print the status code of the response.
print(response.status_code)
print(response.content)

5. Results shown in the Fig 5 – Request Results:

Fig 5 – Request Result

6. Add the requests inside of a function – def send_iss_data, and use the APSScheduler to execute and send events every 5 seconds: –

scheduler.add_job(send_iss_data, 'interval', seconds=5). 

Inside the Python script, bellow find and replace the endpoint configuration that you extracted from the eventstream in step 3 and copied in a notepad to use later: “Endpoint=sb://…./;SharedAccessKeyName=key_3….;SharedAccessKey=E…”

##Libraries
import requests
from azure.eventhub import EventHubProducerClient, EventData
import time
from apscheduler.schedulers.blocking import BlockingScheduler
import json
##Function
def send_iss_data():


    try:

        # Get the latest position
        response = requests.get("http://api.open-notify.org/iss-now.json")


        if response.status_code == 200:
            
            # Convert iss_data to JSON string
            iss_data = response.json()            
            iss_data_json = json.dumps(iss_data)



            # replace connection string from eventhub
            eventhub_connection_str = (
                f"Endpoint=sb://..../;SharedAccessKeyName=key_3....;SharedAccessKey=E....;EntityPath=...."
            )



            producer = EventHubProducerClient.from_connection_string(eventhub_connection_str)



            # Add the event data to the batch
            # Send the batch to the event hub
            with producer:


                event_data_batch = producer.create_batch()

                
                event_data_batch.add(EventData(body=iss_data_json))

                
                producer.send_batch(event_data_batch)

            print("Data sent to Azure Event Hub successfully.")


        ##Inform the failure in case is not possible to send the evnet
        else:
            print(f"Failed to retrieve ISS data. Status code: {response.status_code}")



    except Exception as e:
        print(f"Error: {str(e)}")

###################Executing the Function###########################


# Set up a scheduler to run every 5 seconds
scheduler = BlockingScheduler()
scheduler.add_job(send_iss_data, 'interval', seconds=5)


try:


    scheduler.start()

except (KeyboardInterrupt, SystemExit):

    print("Scheduler stopped.")

Just hit the run button and the events should start to be sent to the eventstream as you can see in Fig 6 Spark Events.

Now let’s get back to the eventstream.

Fig 6 Spark Events

Back to the eventstream in Fabric

7- Eventstream – Add the destination of the data. KQL for us to complete the example proposed by Anushul Sharmas – Fig 7 – KQL

Fig 7 – KQL

8 – Select Direct Ingestion and add the KQL database we created in Step 1. Fig 8 – Adding:

Fig 8 – Adding

9 – The format sent is JSON, so create a new table with JSON format, and do not forget about the datatypes for that table and the JSON nested level of two. Fig 9- Json:

Fig 9 – Json

10. I also added the Spark Lakehouse as destiny for the Ingestion as an example. You should have at this point if you also added the Lakehouse, something similar to this. Feel free to add or not the Lakehouse. Fig 10 – eventstream:

Fig 10 – eventstream

Query the KQL Database

11. KQL – Now the cool part! Open your KQL Queryset and point to the KQL database created in Step 1. You can query using the example taken from the GitHub mentioned as it follows. The KQL queryset is one of the Real-Time Analytics options as you can see in Fig 11 KQuerySet, and results can be seen in Fig 12 – Kqlquery:

// ISS complete orbit
TABLENAME
|project iss_position_latitude,iss_position_longitude,timestamp
| render scatterchart with ( kind=map )
Fig 11 – KQuerySet
Fig 12 – Kqlquery

Back to the eventstream in Fabric

12. Eventstream – Now let’s use the eventstream to send a notification every time the longitude is between 20 and 30 or maybe when the ISS is in a range close to where you live, your choice. Back to the evenstream, create a new destiny for Reflex.

Add a new source: Reflex – Fig 13 reflex

Fig 13 reflex

12 – Open your workspace and list the objects that are inside of it. Look for reflex, the new one that you just created – Fig 14 Workspace_Reflex:

Fig 14 Workspace_Reflex

Inside of Reflex/ Data Activator in Fabric – <Preview>

13- Reflex – Create a new object to configure the trigger of some actions into the design mode – Fig 15 newObject:

Fig 15 newObject

14 – Longitude will be the option chosen since the goal is to be notified every time the Longitude is in a certain range – Fig 16 Longitude:

Fig 16 Longitude

15 – The next step is the action configuration – Fig 17 Action. Configure the Range or if you want a different action, use the filter to define it:

Fig 17 Action

16 – Once the action is saved you can start the trigger and also send a test to check the results as for example – Fig 18 Example:

Fig 18 Example

Follow some references for the Data activator:

https://learn.microsoft.com/en-us/fabric/data-activator/data-activator-assign-data-objects

https://learn.microsoft.com/en-us/fabric/data-activator/data-activator-create-triggers-design-mode

https://learn.microsoft.com/en-us/fabric/data-activator/data-activator-detection-conditions

https://learn.microsoft.com/en-us/fabric/data-activator/data-activator-limitations

Summary: By employing the provided example from Anshul Sharma – Senior Program Manager from Microsoft, available in GitHub anshulsharmas/fabric-iss-demo: Monitoring International Space Station with Microsoft Fabric (github.com), we successfully implemented a comprehensive solution within Fabric. Utilizing the Open Notify API’s documentation for International Space Station (ISS) position data extraction, we employed a Python script to gather this information. Subsequently, the data was transmitted to the event stream and further forwarded to a Kusto Query Language (KQL) database. Through querying this database, we were able to generate a map illustrating the ISS’s current location. Last but not least we also configure notifications using the Data Activator.

Bài đăng blog có liên quan

Fabric Change the Game: Real – time Analytics

tháng 10 29, 2024 của Dandan Zhang

Managed private endpoints allow Fabric experiences to securely access data sources without exposing them to the public network or requiring complex network configurations. We announced General Availability for Managed Private Endpoint in Fabric in May of this year. Learn more here: Announcing General Availability of Fabric Private Links, Trusted Workspace Access, and Managed Private Endpoints. … Continue reading “APIs for Managed Private Endpoint are now available”

tháng 10 28, 2024 của Estera Kot

We’re thrilled to announce that the Native Execution Engine is now available at no additional cost, unlocking next-level performance and efficiency for your workloads. What’s New?  The Native Execution Engine now supports Fabric Runtime 1.3, which includes Apache Spark 3.5 and Delta Lake 3.2. This upgrade enhances Microsoft Fabric’s Data Engineering and Data Science workflows, … Continue reading “Native Execution Engine available at no additional cost!”