Tutorial: Anomaly Detection Solution
In this tutorial, we are going to show you how to build an anomaly detection solution to the vibration data collected from the bearing sensors of two motors step by step. The purpose is to monitor the sensor data continuously and identify any unusual patterns that might indicate motor wear, bearing failure, or other operational issues.
We are going to use an Isolation Forest model, a machine learning technique specifically designed to isolate anomalies by identifying data points that behave differently from the majority. The model processes the sensor data to detect deviations, enabling us to identify potential issues early and take corrective action before they result in equipment failure.
By running this anomaly detection model, we aim to improve the reliability and performance of the motors by predicting failures and scheduling maintenance more effectively. The objective is to calculate in real time the probability to encounter an anomaly on our motors. We would like also to have a unique solution to edit and maintain for all our similar motors.
Prerequisite
The tutorial focuses on a motor's digital twin, specifically its bearing component based on our sample data representing vibration sensors and associated digital twin.
Your environment should have the sample data flowing (mt1_bearing1, mt1_bearing2, ... mt2_bearing4) and your twin structure should look like with all sensors attached respectively to bearings on motor one and two:
If not already the case, please check the following articles to achieve those prerequisite.
- How to stream and loop the relevant sample data
- Follow how to creating and connecting Twin Units
Create a Template
The first step involves creating a Template with properties such as datapoints, JSON objects, floats, etc. The template helps organise everything in one place.
Since we have two motors to apply the anomaly detection model, the template allows us to use a single pipeline to apply the model to both motors efficiently. A template named "my_example_factory_template" could include the following:
- Datapoints: Bearing1, Bearing2, Bearing3, Bearing4, bearing_anomaly
- Variable: Threshold
- JSON: Bearing_properties
Using the Python Toolkit, you can register a template using the following logic:
wizata_dsapi.api().upsert_template(
key="my_example_factory_template",
name="My Example Factory Template"
)
wizata_dsapi.api().add_template_property(
template='my_example_factory_template',
property_name='Bearing1'
)
wizata_dsapi.api().add_template_property(
template='my_example_factory_template',
property_name='Bearing2'
)
wizata_dsapi.api().add_template_property(
template='my_example_factory_template',
property_name='Bearing3'
)
wizata_dsapi.api().add_template_property(
template='my_example_factory_template',
property_name='Bearing4',
property_type='datapoint'
)
wizata_dsapi.api().add_template_property(
template='my_example_factory_template',
property_name='bearing_anomaly',
property_type='datapoint'
)
wizata_dsapi.api().add_template_property(
template='my_example_factory_template',
property_name='Threshold',
property_type='float'
)
wizata_dsapi.api().add_template_property(
template='my_example_factory_template',
property_name='Bearing_properties',
property_type='json'
)
For anomaly detection model we have to create a logical datapoint as "mt1_bearing_anomaly", before registering the twin we should have that datapoint in the platform then we can map it:
datapoint = wizata_dsapi.DataPoint( hardware_id="mt1_bearing_anomaly",name="mt1_bearing_anomaly")
wizata_dsapi.api().create(datapoint)
Once your template is created, the next step is to register a Twin. Each asset will have fixed values for variables and/or mapped datapoints. When querying a registered Twin based on a template, there is no need to know the specific details of each datapoint.
To register the Twin, you must create a mapping between the template property names and their corresponding values:
wizata_dsapi.api().register_twin(
template="my_example_factory_template",
twin="motor_1",
properties={
"Bearing1": "mt1_bearing1",
"Bearing2": "mt1_bearing2",
"Bearing3": "mt1_bearing3",
"Bearing4": "mt1_bearing4",
"bearing_anomaly":"mt1_bearing_anomaly",
"Threshold": 3.1,
"Bearing_properties": {"bearing_pressure_threshold": 89.5,
"bearing_temperature_threshold": 67}
}
)
We should apply the same registering twin for motor two as well.
wizata_dsapi.api().register_twin(
template="my_example_factory_template",
twin="motor_2",
properties={
"Bearing1": "mt2_bearing1",
"Bearing2": "mt2_bearing2",
"Bearing3": "mt2_bearing3",
"Bearing4": "mt2_bearing4",
"bearing_anomaly":"mt2_bearing_anomaly",
"Threshold": 3.8,
"Bearing_properties": {"bearing_pressure_threshold": 88.5,
"bearing_temperature_threshold": 66}
}
)
We can create a template directly from the UI. Please see this article to create the template manually:
At the end of this process, we have a template that includes two Digital Twins: Motor 1 and Motor 2, as shown below.:
Create a Pipeline
After creating the template, the next step is to develop a Pipeline to integrate the solution. A pipeline consists of a series of steps necessary for the project, including querying, transformation, machine learning (ML) models, plots, and writing results.
import uuid
# Generate pipeline
pipeline = wizata_dsapi.Pipeline( # create a pipeline
key="bearing_anomaly_pipeline",
template_id=uuid.UUID("61d63f8b-6142-43bc-bb98-ac146076601b") # this template id is important and it's coming from the template link
# the end of the link is giving us the template id
)
Query
First step for the pipeline is having a query :
pipeline.add_query( #query step
wizata_dsapi.Request(
datapoints =["Bearing1","Bearing2","Bearing3","Bearing4"],
start="now-1d",
end="now",
agg_method="mean",
interval=60000
),
df_name="df"
)
Transformation
For the anomaly detection solution, you will create the transformation, model, and plot scripts, one by one. The process begins with the transformation script :
def bearing_preprocessing(context:wizata_dsapi.Context):
df = context.dataframe # defines dataframes, this will come from the query step of pipeline
df = df[(df > 0.05).any(axis=1)] # drop downtimes
df = df.dropna() # drop null values
return df
wizata_dsapi.api().upsert(bearing_preprocessing) # this save the transformation script into the platform
pipeline.add_transformation( # script step
script="bearing_preprocessing",
input_df_names=['df'],
output_df_names=['df_processed']
)
Training Model
The transformation script removes downtimes and eliminates null values, which is the first step in preparing the data for the machine learning model.
So now the next step is creating a model script :
def anomaly_detection_model(context:wizata_dsapi.Context):
df = context.dataframe
from sklearn.ensemble import IsolationForest
# Create and fit the model
iso_forest = IsolationForest(contamination=0.01, random_state=42) # Adjust contamination based on your data
iso_forest = iso_forest.fit(df)
context.set_model(iso_forest,df.columns)
wizata_dsapi.api().upsert(anomaly_detection_model)
pipeline.add_model(
config=wizata_dsapi.MLModelConfig(
train_script='anomaly_detection_model', # this is the model script
features=["Bearing1","Bearing2","Bearing3","Bearing4"], # input features for model
output_append=True, # it will append the prediction results
output_columns_names=["bearing_anomaly"], # prediction output column's name, it will return model.predict(df["features"])
function="predict", # it can be, transform, predict etc.
model_key="ad_model", # this is the name of model as saved in the platform
by_twin=True # it will save the model seperately by twin. like ad_model_motor1,ad_model_motor1
),
input_df='df_processed',
output_df='df_predict'
)
For the model step, we have two twins, and we need to run the pipeline for both. This will result in two separate models. If we set by_twin to True, it means the two models will be saved separately, named as ad_model.motor_1
and ad_model.motor_2
.
After the model step df_predict will have columns as ["Bearing1", "Bearing2", "Bearing3", "Bearing4", "bearing_anomaly"]
Plot
Next, we will create a plot script to visualse anomalies by marking them with red vertical lines. This step is intended for experimentation purposes only:
def bearing_ad_plot(context:wizata_dsapi.Context):
df = context.dataframe
import plotly.graph_objects as go
# Create the line plot
fig = go.Figure()
# Add traces for each sensor
for column in df.columns[1:-1]: # Exclude the 'Timestamp' and 'anomaly' columns
fig.add_trace(go.Scatter(x=df.index, y=df[column], mode='lines', name=column))
# Add vertical lines for anomalies
anomaly_times = df[df['bearing_anomaly'] == -1].index
for anomaly_time in anomaly_times:
fig.add_vline(x=anomaly_time, line=dict(color='red', width=2))
# Update layout
fig.update_layout(
title="Sensor Data with Anomaly Detection",
xaxis_title="Timestamp",
yaxis_title="Sensor Values",
showlegend=True
)
context.set_plot(fig, "Bearing Anomaly Detection") #save with the title
wizata_dsapi.api().upsert(bearing_ad_plot)
pipeline.add_plot(df_name="df_predict", script="bearing_ad_plot") # plot step
Write
The final step is to write the results to the platform. For this use case, the result to be written is the "bearing_anomaly."
pipeline.add_writer( #writer step
config=wizata_dsapi.WriteConfig(
datapoints={
"bearing_anomaly":"bearing_anomaly"
}
),
input_df='df_predict'
)
After completing all these steps, the next action is to upsert the pipeline to the platform.
wizata_dsapi.api().upsert_pipeline(pipeline)
Please see all the steps for the pipeline here:
import uuid
# Generate pipeline
pipeline = wizata_dsapi.Pipeline( # create a pipeline
key="bearing_anomaly_pipeline",
template_id=uuid.UUID("61d63f8b-6142-43bc-bb98-ac146076601b") # this template is is important and it's coming from the template link
# the end of the link is giving us the template id
)
pipeline.add_query( #query step
wizata_dsapi.Request(
datapoints =["Bearing1","Bearing2","Bearing3","Bearing4"],
start="now-1d",
end="now",
agg_method="mean",
interval=60000,
),
df_name="df"
)
pipeline.add_transformation( # script step
script="bearing_preprocessing",
input_df_names=['df'],
output_df_names=['df_processed']
)
pipeline.add_model(
config=wizata_dsapi.MLModelConfig(
train_script='anomaly_detection_model', # this is the model script
features=["Bearing1","Bearing2","Bearing3","Bearing4"], # input features for model
output_append=True, # it will append the prediction results
output_columns_names=["bearing_anomaly"], # prediction output column's name, it will return model.predict(df["features"])
function="predict", # it can be, transform, predict etc.
model_key="ad_model", # this is the name of model as saved in the platform
by_twin=True # it will save the model seperately by twin. like ad_model_motor1,ad_model_motor1
),
input_df='df_processed',
output_df='df_predict'
)
pipeline.add_plot(df_name="df_predict", script="bearing_ad_plot") # plot step
pipeline.add_writer( #writer step
config=wizata_dsapi.WriteConfig(
datapoints={
"bearing_anomaly":"bearing_anomaly"
}
),
input_df='df_predict'
)
wizata_dsapi.api().upsert_pipeline(pipeline)
This is how the pipeline looks like in the platform:
Testing and Deploying the Pipeline
After having the pipeline in the platform, the next step is to test whether it runs correctly. To do this, we will run the pipeline in Experiment mode. There are two ways to run a pipeline: Experiment mode and Production mode. The difference between them is as follows:
- Experiment mode: This mode is essentially for testing whether the pipeline runs correctly. By default, the write step is not executed in this mode. The pipeline goes through the query, transformation, model, and plot steps, and the experiment returns a plot to verify that the pipeline is functioning properly.
- Production mode: This mode is used to run the pipeline in real-time. In production mode, the write step is executed, and the key difference is that the model step uses a pre-trained model for prediction rather than training a new model. At the end of the process, the results are written to the platform.
In summary, you should first run the pipeline in Experiment mode to train the model, save it, and view the plot results. After that, you can run the pipeline in Production mode to make predictions and write the output to the platform.
Testing the Pipeline
Now, we will run the pipeline in Experiment mode for testing. This will save the model and return the plot as part of the experiment. First, we create an experiment, and then we execute the pipeline in Experiment mode.
wizata_dsapi.api().upsert_experiment("bearing_anomaly_experiment", "Bearing Anomaly Experiment",pipeline = "bearing_anomaly_pipeline")
execution = wizata_dsapi.api().experiment(
experiment='bearing_anomaly_experiment',
pipeline='bearing_anomaly_pipeline',
twin='motor_1')
When we have run the pipeline you can check out the result in the platfrom AI Lab- Experiments:
This means the pipeline ran successfully, trained the model, and saved it within the platform. We are ready to deploy the model in real time.
Deploy the Pipeline
The next step is to run the pipeline in Production mode for real-time operations. You can run it either directly through wizata_dsapi or using a trigger. The advantage of using a trigger is that it’s easy to set up and allows you to schedule the pipeline to run at specific intervals, such as every 5 minutes. Additionally, when running the pipeline in Production mode, it will automatically write the output to the platform by default.
Running it by api:
production_mode =wizata_dsapi.api().run(
pipeline='bearing_anomaly_pipeline',
twin='motor_2')
If the pipeline is executed via a trigger, allowing real-time operation, we can specify an interval for execution. The pipeline will run automatically at each set interval. For this case we will run this pipeline every 5 minutes. Below is an example of how to create the trigger by wizata_dsapi:
trigger = wizata_dsapi.Trigger()
trigger.interval = 300000
trigger.delay = 0
trigger.pipeline_id = 'c7d569ce-5af5-448a-a37d-164fa19166ff'
trigger.template_id = '61d63f8b-6142-43bc-bb98-ac146076601b'
trigger.twin_ids = [
"893c5611-887c-45d1-acea-59bf189435b1" , "c3f77812-513d-433b-a6d3-8866df0c1e64"
]
wizata_dapi.api().create(trigger)
We can create the same trigger in the platform:
In the scenario described above, the pipeline is scheduled to run every 5 minutes for both Motor 1 and Motor 2. After this step, we can review the execution results to verify whether the pipeline is running properly for both Motor 1 and Motor 2. For this step, you should check the Execution logs page.
Conclusion
In conclusion, we have successfully developed an end-to-end anomaly detection model, demonstrating each step of the process. This included creating a template, building a pipeline, and running the pipeline in real-time.
By leveraging the template, we efficiently applied the anomaly detection model to both motors, collecting the anomaly_bearing datapoints. This streamlined approach ensures that the same pipeline can be used for multiple assets, allowing for real-time monitoring and detection of anomalies across both Motor 1 and Motor 2.
You can check out the bearing_anomaly outputs with the unique datapoint names in the explorer:
Updated 29 days ago