Working with pipelines

A pipeline can be created from a solution template in the app or through the API.

A pipeline have following properties:

  • a logical keyidentifier of maximum 32 char (required)
  • a link to a template (required)
  • a set of variables (optional)
  • a list of steps (required)

Pipeline steps

Pipeline steps are connected together by their input and output and must form a unique path.

Therefore, you need to follow these principles:

  • Pipeline must have at least one step without input as starting point. (e.g. a query step)
  • Each input must be connected to one and only output. But same output can be used as input of multiple different steps.
  • Depending on their type, some steps may not have inputs or outputs and may have specific rules to follow.

Data used and transferred during execution of a pipeline are not stored. Therefore you must use a plot script to produce some data visualization or a writer step to store data inside time-series and datapoints.

Each steps are connected together by named dataframes and can be from different types:

  • Query step can be used to query time-series data and produce a Data Frame.
  • Script step can be used to execute any Python function, e.g. a transformation or calling a third-party app.
  • Model step defines how to train or interrogate a Machine Learning Model.
  • Write step allows you to write back your results within Wizata.
  • Plot step allows you to defines how you want to visualize your data.

Pipeline structure

To create or update a pipeline you can use a JSON definition or define it from code directly. Below here is a sample script structure to create a pipeline with all the different steps available, and its equivalent JSON definition

import wizata_dsapi

pipeline = wizata_dsapi.Pipeline(
    key="my_pipeline",
    template_id=uuid.UUID('639e7537-0d80-4056-9670-06a63870d386'),
    variables={
        "var1": "string",
        "from": "relative",
        "to": "relative"
    }
)
pipeline.add_query(
    request=wizata_dsapi.Request(
        datapoints=['dp1','dp2','dp3'],
        interval=60000,
        agg_method='mean',
        start="@from",
        end="@to"
    ),
    df_name="query_output"
)
pipeline.add_transformation(
    script="my_script",
    input_df_names=['query_output'],
    output_df_names=['transform_data']
)
pipeline.add_model(
    config=wizata_dsapi.MLModelConfig(
        model_key='my_model_key',
        train_script='linear_reg_sc',
        features=['dp1', 'dp2'],
        target_feat='dp3',
    ),
    input_df='transform_data',
    output_df='predicted_data'
)
pipeline.add_writer(
    config=wizata_dsapi.WriteConfig(
        datapoints={
            'predicted_dp': 'dp_output'
        }
    ),
    input_df='predicted_data'
)
pipeline.add_plot(
    df_name='predicted_data',
    script='describe_df'
)
wizata_dsapi.api().upsert_pipeline(pipeline)
{
    "key": "my_pipeline",
    "templateId": "639e7537-0d80-4056-9670-06a63870d386",
    "steps": [
      {
        "type": "query",
        "config": {
          "equipments_list": [
            {
              "id": null,
              "datapoints": [
                "dp1",
                "dp2",
                "dp3"
              ],
              "shift": "0s"
            }
          ],
          "timeframe": {
            "start": "@from",
            "end": "@to"
          },
          "aggregations": {
            "agg_method": "mean",
            "interval": 60000
          },
          "filters": {},
          "options": {}
        },
        "inputs": [],
        "outputs": [
          {
            "dataframe": "query_output"
          }
        ]
      },
      {
        "type": "script",
        "config": {
          "function": "my_script"
        },
        "inputs": [
          {
            "dataframe": "query_output"
          }
        ],
        "outputs": [
          {
            "dataframe": "transform_data"
          }
        ]
      },
      {
        "type": "model",
        "config": {
          "train_script": "linear_reg_sc",
          "train_test_split_pct": 1,
          "train_test_split_type": "ignore",
          "target_feat": "dp3",
          "features": [
            "dp1",
            "dp2"
          ],
          "model_key": "my_model_key",
          "output_property": "result",
          "function": "predict"
        },
        "inputs": [
          {
            "dataframe": "transform_data"
          }
        ],
        "outputs": [
          {
            "dataframe": "predicted_data"
          }
        ]
      },
      {
        "type": "write",
        "config": {
          "datapoints": {
            "predicted_dp": "dp_output"
          }
        },
        "inputs": [
          {
            "dataframe": "predicted_data"
          }
        ],
        "outputs": []
      }
    ],
    "variables": {
      "var1": "string",
      "from": "relative",
      "to": "relative"
    }
}

You can therefore alternatively create or update your pipeline directly from a JSON.

import wizata_dsapi
import json

with open('your_pipeline.json', 'r') as f:
    pipeline_dict = json.load(f)
pipeline = wizata_dsapi.Pipeline()
pipeline.from_json(pipeline_dict)
wizata_dsapi.api().upsert_pipeline(pipeline)

We also have a technical documentation for the Pipeline object in our Python SDK documentation.

Additionally, you can create a pipeline inside the platform by navigating to AI Lab > Pipelines and clicking on 'Add' to create a new pipeline.

You will need to pass a mandatory list of steps inside the form. You may also set a template on the pipeline if you want to use on multiple assets.

For example, we can use the following pipeline from the Anomaly Detection Solution tutorial and send it as a list:

[
  {
    "type": "query",
    "config": {
      "equipments_list": [
        {
          "id": null,
          "datapoints": [
            "Bearing1",
            "Bearing2",
            "Bearing3",
            "Bearing4"
          ],
          "shift": "0s"
        }
      ],
      "timeframe": {
        "start": "now-1d",
        "end": "now"
      },
      "aggregations": {
        "agg_method": "mean",
        "interval": 60000
      },
      "null": "all",
      "template": {
        "template_id": "61d63f8b-6142-43bc-bb98-ac146076601b"
      },
      "filters": {},
      "group": {},
      "options": {}
    },
    "inputs": [],
    "outputs": [
      {
        "dataframe": "df"
      }
    ]
  },
  {
    "type": "script",
    "config": {
      "function": "bearing_preprocessing"
    },
    "inputs": [
      {
        "dataframe": "df"
      }
    ],
    "outputs": [
      {
        "dataframe": "df_processed"
      }
    ]
  },
  {
    "type": "model",
    "config": {
      "source": "wizata",
      "train_script": "anomaly_detection_model",
      "train_test_split_pct": 1,
      "train_test_split_type": "ignore",
      "features": [
        "Bearing1",
        "Bearing2",
        "Bearing3",
        "Bearing4"
      ],
      "output_append": "True",
      "output_columns_names": [
        "bearing_anomaly"
      ],
      "function": "predict",
      "model_key": "ad_model",
      "by_twin": "True",
      "by_property": "False"
    },
    "inputs": [
      {
        "dataframe": "df_processed"
      }
    ],
    "outputs": [
      {
        "dataframe": "df_predict"
      }
    ]
  },
  {
    "type": "plot",
    "config": {
      "function": "bearing_ad_plot"
    },
    "inputs": [
      {
        "dataframe": "df_predict"
      }
    ],
    "outputs": []
  },
  {
    "type": "write",
    "config": {
      "datapoints": {
        "bearing_anomaly": "bearing_anomaly"
      }
    },
    "inputs": [
      {
        "dataframe": "df_predict"
      }
    ],
    "outputs": []
  }
]

You should change the template_id from the following JSON with your unique template ID.

Deleting a pipeline

To delete a pipeline, use the .delete() method with the pipeline object as a parameter, or by using the UI:

In the following articles, we will deeply review each pipeline step, starting with the Query step.


What’s Next