Upload a small data file through Azure Event Hub

This tutorial provide sample code solution in Python to send data through Azure Event Hub. You can explore other solutions to Connect data to the Wizata.

This example scenario aims to upload a CSV data file to Wizata. You can adapt the logic to your need.

🚧

Please use this logic for small data file, for full import use an import to the Time-Series DB directly.

📘

You can found this tutorial sample code on our GitHub as sample-02

Sample Data File

Start by download our sample data file from GitHub or use yours. The file contains 24 hours data for multiple time-series with a frequency of 15 seconds. It is used overall to perform tutorials of this documentation.

Setup your environment

Use a notebook or your preferred Python IDE and setup an environment with Python 3.9 (recommended).

Please install the following requirements:

pandas==1.5.3
azure-eventhub==5.11.2

Establish connection to Azure Event Hub

To connect to Azure Event Hub you will need two OS variables corresponding to the Azure Event Hub connection string and the Hub Name.

    azure_producer = EventHubProducerClient.from_connection_string(
        os.environ['HUB_CS'],
        eventhub_name=os.environ['HUB_NAME']
    )

Send Messages

To send message you can use this simple function, it uses a batch containing a group of messages. If required you can split your messages in smaller batches, we recommend 500 messages per batch.

def send(messages, producer):
    try:
        event_data_batch = producer.create_batch()
        for message in messages:
            json_data = json.dumps(message, default=str)
            event_data = EventData(json_data.encode('utf-8'))
            event_data_batch.add(event_data)

        with producer:
            producer.send_batch(event_data_batch)

        print(f' * sent {len(messages)} message(s) to Wizata')
    except Exception as e:
        print(f' * an error occurred sending data to Wizata {e}')

Transform and format your data

Create a transformation logic to convert a row of your dataframe to our format. As a reminder data send should have this JSON structure:

{
	"Timestamp": "2023-10-01T09:00:58Z", 
	"HardwareId": "your_tag_id", 
	"SensorValue": 15.5412
}

Timestamp is ISO formatted string, HardwareId is the unique ID of your sensor and SensorValue is a float.

You can use for example this function:

def row_to_dict_list(row):
    data_list = []
    for column in row.index:
        if column not in ['timestamp', 'time', 'time_seconds', 'time_diff']:
            value = row[column]
            if not pd.isna(value):
                timestamp = row['timestamp'].isoformat()
                data_dict = {
                    'Timestamp': timestamp,
                    'HardwareId': column,
                    'SensorValue': value
                }
                data_list.append(data_dict)

    return data_list

Read and loop over your file rows

Then you simply need to loop over your file, e.g. using this logic :

     # read the csv and prepare the timestamp
    df = pd.read_csv('motors_generic_dataset.csv', parse_dates=True)
    df['timestamp'] = pd.to_datetime(df['timestamp'], utc=True)

    for idx in range(len(df)):
        row_messages = row_to_dict_list(df.iloc[idx])
        send(row_messages, producer=azure_producer)

The complete code should look like this, don't hesitate to adapt it to your need:

import pandas as pd
from azure.eventhub import EventData, EventHubProducerClient
import os
import json


def send(messages, producer):
    try:
        event_data_batch = producer.create_batch()
        for message in messages:
            json_data = json.dumps(message, default=str)
            event_data = EventData(json_data.encode('utf-8'))
            event_data_batch.add(event_data)

        with producer:
            producer.send_batch(event_data_batch)

        print(f' * sent {len(messages)} message(s) to Wizata')
    except Exception as e:
        print(f' * an error occurred sending data to Wizata {e}')


def row_to_dict_list(row):
    data_list = []
    for column in row.index:
        if column not in ['timestamp', 'time', 'time_seconds', 'time_diff']:
            value = row[column]
            if not pd.isna(value):
                timestamp = row['timestamp'].isoformat()
                data_dict = {
                    'Timestamp': timestamp,
                    'HardwareId': column,
                    'SensorValue': value
                }
                data_list.append(data_dict)

    return data_list


def time_to_seconds(t):
    return t.hour * 3600 + t.minute * 60 + t.second


if __name__ == '__main__':

    # connect to your hub
    azure_producer = EventHubProducerClient.from_connection_string(
        os.environ['HUB_CS'],
        eventhub_name=os.environ['HUB_NAME']
    )

    # read the csv and prepare the timestamp
    df = pd.read_csv('motors_generic_dataset.csv', parse_dates=True)
    df['timestamp'] = pd.to_datetime(df['timestamp'], utc=True)

    for idx in range(len(df)):
        row_messages = row_to_dict_list(df.iloc[idx])
        send(row_messages, producer=azure_producer)

    print('completed')