Working with pipelines
A pipeline can be created from a solution template in the app or through the API.
A pipeline have following properties:
- a logical keyidentifier of maximum 32 char (required)
- a link to a template (required)
- a set of variables (optional)
- a list of steps (required)
Pipeline steps
Pipeline steps are connected together by their input and output and must form a unique path.
Therefore, you need to follow these principles:
- Pipeline must have at least one step without input as starting point. (e.g. a query step)
- Each input must be connected to one and only output. But same output can be used as input of multiple different steps.
- Depending on their type, some steps may not have inputs or outputs and may have specific rules to follow.
Data used and transferred during execution of a pipeline are not stored. Therefore you must use a plot script to produce some data visualization or a writer step to store data inside time-series and datapoints.
Each steps are connected together by named dataframes and can be from different types:
- Query step can be used to query time-series data and produce a Data Frame.
- Script step can be used to execute any Python function, e.g. a transformation or calling a third-party app.
- Model step defines how to train or interrogate a Machine Learning Model.
- Write step allows you to write back your results within Wizata.
- Plot step allows you to defines how you want to visualize your data.
Pipeline structure
To create or update a pipeline you can use a JSON definition or define it from code directly. Below here is a sample script structure to create a pipeline with all the different steps available, and its equivalent JSON definition
import wizata_dsapi
pipeline = wizata_dsapi.Pipeline(
key="my_pipeline",
template_id=uuid.UUID('639e7537-0d80-4056-9670-06a63870d386'),
variables={
"var1": "string",
"from": "relative",
"to": "relative"
}
)
pipeline.add_query(
request=wizata_dsapi.Request(
datapoints=['dp1','dp2','dp3'],
interval=60000,
agg_method='mean',
start="@from",
end="@to"
),
df_name="query_output"
)
pipeline.add_transformation(
script="my_script",
input_df_names=['query_output'],
output_df_names=['transform_data']
)
pipeline.add_model(
config=wizata_dsapi.MLModelConfig(
model_key='my_model_key',
train_script='linear_reg_sc',
features=['dp1', 'dp2'],
target_feat='dp3',
),
input_df='transform_data',
output_df='predicted_data'
)
pipeline.add_writer(
config=wizata_dsapi.WriteConfig(
datapoints={
'predicted_dp': 'dp_output'
}
),
input_df='predicted_data'
)
pipeline.add_plot(
df_name='predicted_data',
script='describe_df'
)
wizata_dsapi.api().upsert_pipeline(pipeline)
{
"key": "my_pipeline",
"templateId": "639e7537-0d80-4056-9670-06a63870d386",
"steps": [
{
"type": "query",
"config": {
"equipments_list": [
{
"id": null,
"datapoints": [
"dp1",
"dp2",
"dp3"
],
"shift": "0s"
}
],
"timeframe": {
"start": "@from",
"end": "@to"
},
"aggregations": {
"agg_method": "mean",
"interval": 60000
},
"filters": {},
"options": {}
},
"inputs": [],
"outputs": [
{
"dataframe": "query_output"
}
]
},
{
"type": "script",
"config": {
"function": "my_script"
},
"inputs": [
{
"dataframe": "query_output"
}
],
"outputs": [
{
"dataframe": "transform_data"
}
]
},
{
"type": "model",
"config": {
"train_script": "linear_reg_sc",
"train_test_split_pct": 1,
"train_test_split_type": "ignore",
"target_feat": "dp3",
"features": [
"dp1",
"dp2"
],
"model_key": "my_model_key",
"output_property": "result",
"function": "predict"
},
"inputs": [
{
"dataframe": "transform_data"
}
],
"outputs": [
{
"dataframe": "predicted_data"
}
]
},
{
"type": "write",
"config": {
"datapoints": {
"predicted_dp": "dp_output"
}
},
"inputs": [
{
"dataframe": "predicted_data"
}
],
"outputs": []
}
],
"variables": {
"var1": "string",
"from": "relative",
"to": "relative"
}
}
You can therefore alternatively create or update your pipeline directly from a JSON.
import wizata_dsapi
import json
with open('your_pipeline.json', 'r') as f:
pipeline_dict = json.load(f)
pipeline = wizata_dsapi.Pipeline()
pipeline.from_json(pipeline_dict)
wizata_dsapi.api().upsert_pipeline(pipeline)
We also have a technical documentation for the Pipeline object in our Python SDK documentation.
Additionally, you can create a pipeline inside the platform by navigating to AI Lab > Pipelines and clicking on 'Add' to create a new pipeline.

You will need to pass a mandatory list of steps inside the form. You may also set a template on the pipeline if you want to use on multiple assets.
For example, we can use the following pipeline from the Anomaly Detection Solution tutorial and send it as a list:
[
{
"type": "query",
"config": {
"equipments_list": [
{
"id": null,
"datapoints": [
"Bearing1",
"Bearing2",
"Bearing3",
"Bearing4"
],
"shift": "0s"
}
],
"timeframe": {
"start": "now-1d",
"end": "now"
},
"aggregations": {
"agg_method": "mean",
"interval": 60000
},
"null": "all",
"template": {
"template_id": "61d63f8b-6142-43bc-bb98-ac146076601b"
},
"filters": {},
"group": {},
"options": {}
},
"inputs": [],
"outputs": [
{
"dataframe": "df"
}
]
},
{
"type": "script",
"config": {
"function": "bearing_preprocessing"
},
"inputs": [
{
"dataframe": "df"
}
],
"outputs": [
{
"dataframe": "df_processed"
}
]
},
{
"type": "model",
"config": {
"source": "wizata",
"train_script": "anomaly_detection_model",
"train_test_split_pct": 1,
"train_test_split_type": "ignore",
"features": [
"Bearing1",
"Bearing2",
"Bearing3",
"Bearing4"
],
"output_append": "True",
"output_columns_names": [
"bearing_anomaly"
],
"function": "predict",
"model_key": "ad_model",
"by_twin": "True",
"by_property": "False"
},
"inputs": [
{
"dataframe": "df_processed"
}
],
"outputs": [
{
"dataframe": "df_predict"
}
]
},
{
"type": "plot",
"config": {
"function": "bearing_ad_plot"
},
"inputs": [
{
"dataframe": "df_predict"
}
],
"outputs": []
},
{
"type": "write",
"config": {
"datapoints": {
"bearing_anomaly": "bearing_anomaly"
}
},
"inputs": [
{
"dataframe": "df_predict"
}
],
"outputs": []
}
]
You should change the
template_id
from the following JSON with your unique template ID.
Deleting a pipeline
To delete a pipeline, use the .delete()
method with the pipeline object as a parameter, or by using the UI:
In the following articles, we will deeply review each pipeline step, starting with the Query step.
Updated 26 days ago