Tutorial: Anomaly Detection - Email & Slack Alerts

Prerequisites

In this tutorial, we will continue with the example from the Tutorial: Anomaly Detection Solution. We highly recommend reading it before proceeding.

Additionally, you can find more details about send_alerts() method in our dedicated article: Send alert notifications to services using pipelines

Slack webhook configuration

For a step-by-step guide and troubleshooting on Slack webhooks, refer to the official Slack documentation

If you already have a Slack workspace, you need to associate a Slack app with a specific channel and Enable incoming webhooks in the app configuration. For example, we will create a custom app called Notification Alert and link it to the #sandbox-channel

Once the app is successfully linked and the webhook is enabled for the channel, you will see a webhook URL that can be copied and added to your custom.ini file.

[slack]
slack_custom_webhook=https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXX

Configuring our pipeline

To begin, we will retrieve the pipeline object from the Anomaly Detection Solution using .get(), passing the correspondent key.

pipeline = wizata_dsapi.api().get(key="bearing_anomaly_pipeline", entity=wizata_dsapi.Pipeline)

Next, we will create a script to send email and Slack alerts whenever an anomaly is detected.

Since we are using a Trigger to automate the pipeline execution, the Write step stores a -1 value in the bearing_anomaly datapoint every time an anomaly occurs. We will leverage this behavior to trigger the alerting system.

We will analyze the last five values from the df_predict Data Frame for each twin to determine whether an anomaly occurred during that period.

def send_slack_and_email_alerts(context: wizata_dsapi.Context):

    twin_object = context.api.get(id=context.registration.twin_id, entity=wizata_dsapi.Twin)

    df = context.dataframe.tail(5)

    if (df["bearing_anomaly"] == -1).any():
        # Email alerts
        context.api.send_alerts(
            message=f"Alert: There is an anomaly detected on twin: {twin_object.name}, please verify your execution logs for more information.",
            recipients=["[email protected]"],
            alert_type=wizata_dsapi.AlertType.EMAIL,
            subject="Anomaly detection alert"
        )
        # Slack alerts
        context.api.send_alerts(
            message=f"Alert: There is an anomaly detected on twin: {twin_object.name}, please verify your execution logs for more information",
            alert_type=wizata_dsapi.AlertType.SLACK,
            recipients=["sandbox-channel"],
            webhook='slack_custom_webhook'
        )
wizata_dsapi.api().upsert(send_slack_and_email_alerts)

Finally, we need to add this transformation to our pipeline object and upsert it:

pipeline.add_transformation( # alert script step
    script="send_slack_and_email_alerts",
    input_df_names=['df_predict']
)

wizata_dsapi.api().upsert_pipeline(pipeline)

For a complete overview of the updated pipeline steps, please refer to:

def send_slack_and_email_alerts(context: wizata_dsapi.Context):

    twin_object = context.api.get(id=context.registration.twin_id, entity=wizata_dsapi.Twin)

    df = context.dataframe.tail(5)

    if (df["bearing_anomaly"] == -1).any():
        # Email alerts
        context.api.send_alerts(
            message=f"Alert: There is an anomaly detected on twin: {twin_object.name}, please verify your execution logs for more information.",
            recipients=["[email protected]"],
            alert_type=wizata_dsapi.AlertType.EMAIL,
            subject="Anomaly detection alert"
        )
        # Slack alerts
        context.api.send_alerts(
            message=f"Alert: There is an anomaly detected on twin: {twin_object.name}, please verify your execution logs for more information",
            alert_type=wizata_dsapi.AlertType.SLACK,
            recipients=["sandbox-channel"],
            webhook='slack_custom_webhook'
        )
        
        
wizata_dsapi.api().upsert(send_slack_and_email_alerts)


pipeline = wizata_dsapi.api().get(key="bearing_anomaly_pipeline", entity=wizata_dsapi.Pipeline)

pipeline.add_transformation( # alert script step
    script="send_slack_and_email_alerts",
    input_df_names=['df_predict']
)

wizata_dsapi.api().upsert_pipeline(pipeline)

Visualizing our results

Once the pipeline executes for the first time, you will start receiving notifications via email and Slack whenever an anomaly is detected by the model.

This is how the alert will appear in your Slack channel:

And in your email inbox:

If you don’t see the email notification, check your spam folder and ensure that emails from Wizata are allowed to avoid missing future alerts.