Stream and loop on a data file through Azure Event Hub

This tutorial provide sample code solution in Python to send data through Azure Event Hub. You can explore other solutions to Connect data to the Wizata.

This example scenario aims to stream a CSV data file containing 24 hours of machine data to Wizata. You can adapt the logic to your need.

To upload a sample data file one time, please check Upload a small one shot data file through Azure Event Hub

📘

You can found this tutorial sample code on our GitHub as sample-01

Sample Data File

Start by download our sample data file from GitHub or use yours. The file contains 24 hours data for multiple time-series with a frequency of 15 seconds. It is used overall to perform tutorials of this documentation.

Setup your environment

Use a notebook or your preferred Python IDE and setup an environment with Python 3.9 (recommended).

Please install the following requirements:

pandas==1.5.3
azure-eventhub==5.11.2

Establish connection to Azure Event Hub

To connect to Azure Event Hub you will need two OS variables corresponding to the Azure Event Hub connection string and the Hub Name.

    azure_producer = EventHubProducerClient.from_connection_string(
        os.environ['HUB_CS'],
        eventhub_name=os.environ['HUB_NAME']
    )

Send Messages

To send message you can use this simple function, it uses a batch containing a group of messages. If required you can split your messages in smaller batches, we recommend 500 messages per batch.

def send(messages, producer):
    try:
        event_data_batch = producer.create_batch()
        for message in messages:
            json_data = json.dumps(message, default=str)
            event_data = EventData(json_data.encode('utf-8'))
            event_data_batch.add(event_data)

        with producer:
            producer.send_batch(event_data_batch)

        print(f' * sent {len(messages)} message(s) to Wizata')
    except Exception as e:
        print(f' * an error occurred sending data to Wizata {e}')

Synchronising time

Here is a complete example that is running in loop to synchronise the current UTC time with the file and loops over it. After opening the file and converting it to a pandas dataframe, it finds the closest row to current UTC time as the file contains 15 seconds frequency data. Then it sleeps until being at 15 seconds interval and push the right data to Wizata through Azure Event Hub and then continue to next row looping on the file every 24 hours.

import pandas as pd
import time
from datetime import datetime, timezone, timedelta
from azure.eventhub import EventData, EventHubProducerClient
import os
import json


def send(messages, producer):
    try:
        event_data_batch = producer.create_batch()
        for message in messages:
            json_data = json.dumps(message, default=str)
            event_data = EventData(json_data.encode('utf-8'))
            event_data_batch.add(event_data)

        with producer:
            producer.send_batch(event_data_batch)

        print(f' * sent {len(messages)} message(s) to Wizata')
    except Exception as e:
        print(f' * an error occurred sending data to Wizata {e}')


def row_to_dict_list(row, current_date):
    data_list = []
    for column in row.index:
        if column not in ['timestamp', 'time', 'time_seconds', 'time_diff']:
            value = row[column]
            if not pd.isna(value):
                timestamp = row['timestamp'].replace(year=current_date.year, month=current_date.month, day=current_date.day)
                timestamp = timestamp.isoformat()
                data_dict = {
                    'Timestamp': timestamp,
                    'HardwareId': column,
                    'SensorValue': value
                }
                data_list.append(data_dict)

    return data_list


def time_to_seconds(t):
    return t.hour * 3600 + t.minute * 60 + t.second


if __name__ == '__main__':

    # connect to your hub
    azure_producer = EventHubProducerClient.from_connection_string(
        os.environ['HUB_CS'],
        eventhub_name=os.environ['HUB_NAME']
    )

    # read the csv and prepare the timestamp
    df = pd.read_csv('motors_generic_dataset.csv', parse_dates=True)
    df['timestamp'] = pd.to_datetime(df['timestamp'], utc=True)
    df['time'] = df['timestamp'].dt.time
    df['time_seconds'] = df['time'].apply(time_to_seconds)

    # time sync
    current_utc_time = datetime.now(timezone.utc).time()
    next_interval = 15 - (datetime.now().second % 15)
    if next_interval == 15:
        next_interval = 0

    print(f'Waiting {next_interval} seconds to synchronize...')
    time.sleep(next_interval)

    while True:
        start_time = datetime.now(timezone.utc)
        current_utc_time = start_time
        current_date = current_utc_time.date()
        current_time_seconds = time_to_seconds(current_utc_time.time())

        df['time_diff'] = (df['time_seconds'] - current_time_seconds).abs()
        closest_idx = df['time_diff'].idxmin()

        if df.at[closest_idx, 'time_seconds'] > current_time_seconds:
            closest_idx = max(0, closest_idx - 1)

        for idx in range(closest_idx, len(df)):
            row = df.iloc[idx]
            row_messages = row_to_dict_list(row, current_date)

            expected_time = start_time + timedelta(seconds=(idx - closest_idx) * 15)
            send(row_messages, producer=azure_producer)

            sleep_time = (expected_time - datetime.now(timezone.utc)).total_seconds()
            if sleep_time > 0:
                time.sleep(sleep_time)

        print("Restarting dataset loop...")
        df = pd.read_csv('motors_generic_dataset.csv', parse_dates=['timestamp'])
        df['timestamp'] = pd.to_datetime(df['timestamp'], utc=True)
        df['time'] = df['timestamp'].dt.time
        df['time_seconds'] = df['time'].apply(time_to_seconds)

You can adapt the code to your need: sending data as you receive them, import historical data as a one shot, using different frequencies and data transformation.