Tutorial: Anomaly Detection Solution

In this tutorial, we are going to show you how to build an anomaly detection solution to the vibration data collected from the bearing sensors of two motors step by step. The purpose is to monitor the sensor data continuously and identify any unusual patterns that might indicate motor wear, bearing failure, or other operational issues.

We are going to use an Isolation Forest model, a machine learning technique specifically designed to isolate anomalies by identifying data points that behave differently from the majority. The model processes the sensor data to detect deviations, enabling us to identify potential issues early and take corrective action before they result in equipment failure.

By running this anomaly detection model, we aim to improve the reliability and performance of the motors by predicting failures and scheduling maintenance more effectively. The objective is to calculate in real time the probability to encounter an anomaly on our motors. We would like also to have a unique solution to edit and maintain for all our similar motors.

Prerequisite

The tutorial focuses on a motor's digital twin, specifically its bearing component based on our sample data representing vibration sensors and associated digital twin.

Your environment should have the sample data flowing (mt1_bearing1, mt1_bearing2, ... mt2_bearing4) and your twin structure should look like with all sensors attached respectively to bearings on motor one and two:

If not already the case, please check the following articles to achieve those prerequisite.

Create a Template

The first step involves creating a Template with properties such as datapoints, JSON objects, floats, etc. The template helps organise everything in one place.

Since we have two motors to apply the anomaly detection model, the template allows us to use a single pipeline to apply the model to both motors efficiently. A template named "my_example_factory_template" could include the following:

  • Datapoints: Bearing1, Bearing2, Bearing3, Bearing4, bearing_anomaly
  • Variable: Threshold
  • JSON: Bearing_properties

Using the Python Toolkit, you can register a template using the following logic:

wizata_dsapi.api().upsert_template(
    key="my_example_factory_template",
    name="My Example Factory Template"
)

wizata_dsapi.api().add_template_property(
    template='my_example_factory_template',
    property_name='Bearing1'
)

wizata_dsapi.api().add_template_property(
    template='my_example_factory_template',
    property_name='Bearing2'
)

wizata_dsapi.api().add_template_property(
    template='my_example_factory_template',
    property_name='Bearing3'
)

wizata_dsapi.api().add_template_property(
    template='my_example_factory_template',
    property_name='Bearing4',
    property_type='datapoint'
)

wizata_dsapi.api().add_template_property(
    template='my_example_factory_template',
    property_name='bearing_anomaly',
    property_type='datapoint'
)

wizata_dsapi.api().add_template_property(
    template='my_example_factory_template',
    property_name='Threshold',
    property_type='float'
)

wizata_dsapi.api().add_template_property(
    template='my_example_factory_template',
    property_name='Bearing_properties',
    property_type='json'
)

For anomaly detection model we have to create a logical datapoint as "mt1_bearing_anomaly", before registering the twin we should have that datapoint in the platform then we can map it:

datapoint = wizata_dsapi.DataPoint( hardware_id="mt1_bearing_anomaly",name="mt1_bearing_anomaly")
wizata_dsapi.api().create(datapoint)

Once your template is created, the next step is to register a Twin. Each asset will have fixed values for variables and/or mapped datapoints. When querying a registered Twin based on a template, there is no need to know the specific details of each datapoint.

To register the Twin, you must create a mapping between the template property names and their corresponding values:

wizata_dsapi.api().register_twin(
    template="my_example_factory_template",
    twin="motor_1",
    properties={
        "Bearing1": "mt1_bearing1",
        "Bearing2": "mt1_bearing2",
        "Bearing3": "mt1_bearing3",
        "Bearing4": "mt1_bearing4",
        "bearing_anomaly":"mt1_bearing_anomaly",
        "Threshold": 3.1, 
        "Bearing_properties": {"bearing_pressure_threshold": 89.5,
                               "bearing_temperature_threshold": 67}
      
    }
)

We should apply the same registering twin for motor two as well.

wizata_dsapi.api().register_twin(
    template="my_example_factory_template",
    twin="motor_2",
    properties={
        "Bearing1": "mt2_bearing1",
        "Bearing2": "mt2_bearing2",
        "Bearing3": "mt2_bearing3",
        "Bearing4": "mt2_bearing4",
        "bearing_anomaly":"mt2_bearing_anomaly",
        "Threshold": 3.8, 
        "Bearing_properties": {"bearing_pressure_threshold": 88.5,
                               "bearing_temperature_threshold": 66}
      
    }
)

We can create a template directly from the UI. Please see this article to create the template manually:

At the end of this process, we have a template that includes two Digital Twins: Motor 1 and Motor 2, as shown below.:


Create a Pipeline

After creating the template, the next step is to develop a Pipeline to integrate the solution. A pipeline consists of a series of steps necessary for the project, including querying, transformation, machine learning (ML) models, plots, and writing results.

import uuid
# Generate pipeline
pipeline = wizata_dsapi.Pipeline(  # create a pipeline 
    key="bearing_anomaly_pipeline",
    template_id=uuid.UUID("61d63f8b-6142-43bc-bb98-ac146076601b") # this template id is important and it's coming from the template link 
    # the end of the link is giving us the template id
)

Query

First step for the pipeline is having a query :

pipeline.add_query(  #query step
    wizata_dsapi.Request(
        datapoints =["Bearing1","Bearing2","Bearing3","Bearing4"],
        start="now-1d",
        end="now",
         agg_method="mean",
        interval=60000
    ),
    df_name="df"
)

Transformation

For the anomaly detection solution, you will create the transformation, model, and plot scripts, one by one. The process begins with the transformation script :

def bearing_preprocessing(context:wizata_dsapi.Context):
    df = context.dataframe # defines dataframes, this will come from the query step of pipeline
    df = df[(df > 0.05).any(axis=1)] # drop downtimes
    df = df.dropna() # drop null values
    return df 

wizata_dsapi.api().upsert(bearing_preprocessing) # this save the transformation script into the platform 
pipeline.add_transformation( # script step
    script="bearing_preprocessing",
    input_df_names=['df'],
    output_df_names=['df_processed']
)

Training Model

The transformation script removes downtimes and eliminates null values, which is the first step in preparing the data for the machine learning model.

So now the next step is creating a model script :

def anomaly_detection_model(context:wizata_dsapi.Context):
    df = context.dataframe
    from sklearn.ensemble import IsolationForest

    # Create and fit the model
    iso_forest = IsolationForest(contamination=0.01, random_state=42)  # Adjust contamination based on your data
    iso_forest = iso_forest.fit(df)
    context.set_model(iso_forest,df.columns)
wizata_dsapi.api().upsert(anomaly_detection_model)
pipeline.add_model(
    config=wizata_dsapi.MLModelConfig(
        train_script='anomaly_detection_model', # this is the model script 
        features=["Bearing1","Bearing2","Bearing3","Bearing4"], # input features for model
        output_append=True, # it will append the prediction results
        output_columns_names=["bearing_anomaly"], # prediction output column's name, it will return model.predict(df["features"])
        function="predict", # it can be, transform, predict etc.
        model_key="ad_model", # this is the name of model as saved in the platform
        by_twin=True # it will save the model seperately by twin. like ad_model_motor1,ad_model_motor1

    ),
    input_df='df_processed',
    output_df='df_predict'
)

For the model step, we have two twins, and we need to run the pipeline for both. This will result in two separate models. If we set by_twin to True, it means the two models will be saved separately, named as ad_model.motor_1 and ad_model.motor_2.

After the model step df_predict will have columns as ["Bearing1", "Bearing2", "Bearing3", "Bearing4", "bearing_anomaly"]

Plot

Next, we will create a plot script to visualse anomalies by marking them with red vertical lines. This step is intended for experimentation purposes only:

def bearing_ad_plot(context:wizata_dsapi.Context):
    df = context.dataframe
    import plotly.graph_objects as go

    # Create the line plot
    fig = go.Figure()

    # Add traces for each sensor
    for column in df.columns[1:-1]:  # Exclude the 'Timestamp' and 'anomaly' columns
        fig.add_trace(go.Scatter(x=df.index, y=df[column], mode='lines', name=column))

    # Add vertical lines for anomalies
    anomaly_times = df[df['bearing_anomaly'] == -1].index
    for anomaly_time in anomaly_times:
        fig.add_vline(x=anomaly_time, line=dict(color='red', width=2))

    # Update layout
    fig.update_layout(
        title="Sensor Data with Anomaly Detection",
        xaxis_title="Timestamp",
        yaxis_title="Sensor Values",
        showlegend=True
    )
    
    context.set_plot(fig, "Bearing Anomaly Detection") #save with the title
wizata_dsapi.api().upsert(bearing_ad_plot)
pipeline.add_plot(df_name="df_predict", script="bearing_ad_plot") # plot step

Write

The final step is to write the results to the platform. For this use case, the result to be written is the "bearing_anomaly."

pipeline.add_writer( #writer step
    config=wizata_dsapi.WriteConfig(
        datapoints={
            "bearing_anomaly":"bearing_anomaly"
        }
    ),
    input_df='df_predict'
)

After completing all these steps, the next action is to upsert the pipeline to the platform.

wizata_dsapi.api().upsert_pipeline(pipeline)

Please see all the steps for the pipeline here:

import uuid
# Generate pipeline
pipeline = wizata_dsapi.Pipeline(  # create a pipeline 
    key="bearing_anomaly_pipeline",
    template_id=uuid.UUID("61d63f8b-6142-43bc-bb98-ac146076601b") # this template is is important and it's coming from the template link 
    # the end of the link is giving us the template id
)

pipeline.add_query(  #query step
    wizata_dsapi.Request(
        datapoints =["Bearing1","Bearing2","Bearing3","Bearing4"],
        start="now-1d",
        end="now",
        agg_method="mean",
        interval=60000,
    ),
    df_name="df"
)

pipeline.add_transformation( # script step
    script="bearing_preprocessing",
    input_df_names=['df'],
    output_df_names=['df_processed']
)

pipeline.add_model(
    config=wizata_dsapi.MLModelConfig(
        train_script='anomaly_detection_model', # this is the model script 
        features=["Bearing1","Bearing2","Bearing3","Bearing4"], # input features for model
        output_append=True, # it will append the prediction results
        output_columns_names=["bearing_anomaly"], # prediction output column's name, it will return model.predict(df["features"])
        function="predict", # it can be, transform, predict etc.
        model_key="ad_model", # this is the name of model as saved in the platform
        by_twin=True # it will save the model seperately by twin. like ad_model_motor1,ad_model_motor1

    ),
    input_df='df_processed',
    output_df='df_predict'
)

pipeline.add_plot(df_name="df_predict", script="bearing_ad_plot") # plot step

pipeline.add_writer( #writer step
    config=wizata_dsapi.WriteConfig(
        datapoints={
            "bearing_anomaly":"bearing_anomaly"
        }
    ),
    input_df='df_predict'
)

wizata_dsapi.api().upsert_pipeline(pipeline)

This is how the pipeline looks like in the platform:

Testing and Deploying the Pipeline

After having the pipeline in the platform, the next step is to test whether it runs correctly. To do this, we will run the pipeline in Experiment mode. There are two ways to run a pipeline: Experiment mode and Production mode. The difference between them is as follows:

  • Experiment mode: This mode is essentially for testing whether the pipeline runs correctly. By default, the write step is not executed in this mode. The pipeline goes through the query, transformation, model, and plot steps, and the experiment returns a plot to verify that the pipeline is functioning properly.
  • Production mode: This mode is used to run the pipeline in real-time. In production mode, the write step is executed, and the key difference is that the model step uses a pre-trained model for prediction rather than training a new model. At the end of the process, the results are written to the platform.

In summary, you should first run the pipeline in Experiment mode to train the model, save it, and view the plot results. After that, you can run the pipeline in Production mode to make predictions and write the output to the platform.

Testing the Pipeline

Now, we will run the pipeline in Experiment mode for testing. This will save the model and return the plot as part of the experiment. First, we create an experiment, and then we execute the pipeline in Experiment mode.

wizata_dsapi.api().upsert_experiment("bearing_anomaly_experiment", "Bearing Anomaly Experiment",pipeline = "bearing_anomaly_pipeline")
execution = wizata_dsapi.api().experiment(
    experiment='bearing_anomaly_experiment',
    pipeline='bearing_anomaly_pipeline',
    twin='motor_1')

When we have run the pipeline you can check out the result in the platfrom AI Lab- Experiments:

This means the pipeline ran successfully, trained the model, and saved it within the platform. We are ready to deploy the model in real time.

Deploy the Pipeline

The next step is to run the pipeline in Production mode for real-time operations. You can run it either directly through wizata_dsapi or using a trigger. The advantage of using a trigger is that it’s easy to set up and allows you to schedule the pipeline to run at specific intervals, such as every 5 minutes. Additionally, when running the pipeline in Production mode, it will automatically write the output to the platform by default.

Running it by api:

production_mode =wizata_dsapi.api().run(
    pipeline='bearing_anomaly_pipeline',
    twin='motor_2')

If the pipeline is executed via a trigger, allowing real-time operation, we can specify an interval for execution. The pipeline will run automatically at each set interval. For this case we will run this pipeline every 5 minutes. Below is an example of how to create the trigger by wizata_dsapi:

trigger = wizata_dsapi.Trigger()
trigger.interval = 300000
trigger.delay = 0
trigger.pipeline_id = 'c7d569ce-5af5-448a-a37d-164fa19166ff'
trigger.template_id = '61d63f8b-6142-43bc-bb98-ac146076601b'
trigger.twin_ids = [
    "893c5611-887c-45d1-acea-59bf189435b1" , "c3f77812-513d-433b-a6d3-8866df0c1e64"
]
wizata_dapi.api().create(trigger)

We can create the same trigger in the platform:

In the scenario described above, the pipeline is scheduled to run every 5 minutes for both Motor 1 and Motor 2. After this step, we can review the execution results to verify whether the pipeline is running properly for both Motor 1 and Motor 2. For this step, you should check the Execution logs page.

Conclusion

In conclusion, we have successfully developed an end-to-end anomaly detection model, demonstrating each step of the process. This included creating a template, building a pipeline, and running the pipeline in real-time.

By leveraging the template, we efficiently applied the anomaly detection model to both motors, collecting the anomaly_bearing datapoints. This streamlined approach ensures that the same pipeline can be used for multiple assets, allowing for real-time monitoring and detection of anomalies across both Motor 1 and Motor 2.

You can check out the bearing_anomaly outputs with the unique datapoint names in the explorer: