Execution
An execution keeps all information on time, status and results of a specific pipeline manual or automatic run.
Execution tracks a pipeline status while its being processed. It contains useful information regarding your pipeline processing:
- status - queued, started, abort requested, aborted, failed, completed
- pipeline - the pipeline that have been or that is going to be executed
- template - the template if referenced on your pipeline and/or trigger
- twin - the twin on which the pipeline is run - while a Trigger can specifies multiple twins, it creates one execution for each Twin
- queued, started, created, updated date and execution and waiting time - useful information on timing and duration.
To run a pipeline from User Interface you should create an Experiment. Pipelines are run on Engine
Run a Pipeline
When triggering an execution, the DSAPI will put your pipeline execution task on a queue. Depending on your choice your pipeline will be executed on experiment or production mode.
Experiment mode
Using Python or UI, you can test your pipeline and execute it as an Experiment. Check how you can Experiment within Wizata in the following article.
Production mode
To run a pipeline in production mode, we can use the run method:
execution = wizata_dsapi.api().run(
pipeline='my_pipeline',
twin='my_twin'
)In Python, when using the run() method, your local version is sent to the platform as desired version.
But you can also set it manually if necessary:
experiment = wizata_dsapi.api().experiment(
experiment="test_experiment",
pipeline='centrifugal_pump_ad_training',
version='3.12',
twin="OG_PL1_PROC_PUMP1"
)To check versions available in your app, please navigate to AI Lab and look at the overview. You can find more information in the following article Upgrade your solution Python version
By default, as opposite to experiment mode, no model are trained and no plots are produced. You can change this behavior using the same parameter as in experiment mode. For example, if you want to continue training model:
execution = wizata_dsapi.api().run(
pipeline='my_pipeline',
twin='my_twin',
train=True,
plot=False,
write=True
)Results
As a results, the platform produce an Execution object that is stored in the platform linked to your template and your experiment.
Using the user interface, you can have a look at the Execution Logs page, where you will find all the information regarding the results of your experiments, just like in the image below.
Starting v11.4, among the options available in Executions page top toolbar, you can manage and navigate your data using the following actions:
- Select between pipeline production run or experiment run (1)
- Time Range Selector: Define a specific start and end date/time window to narrow down the logs you want to inspect. (you can select up to 30 days) (2)
- Search: Quickly find specific executions by typing keywords, names, or messages. (3)
- Refresh: Update the table to fetch and display the most recent execution data. (4)
- Clear: Instantly remove any applied filters or search queries to reset the default view. (5)
- Filters: Open an advanced menu to narrow down the execution logs based on specific parameters and criteria. (6)
Additionally, within the table itself, you can interact with several elements:
Delete (Top-left button): Becomes active when you select a log using the Checkboxes on the left side of each row.
View Details (Eye icon): Click on this icon next to any log to inspect the specific details of that individual execution.
Column Filters (Funnel icon): Click the funnel icon next to headers like Level, Status, Pipeline, Template, and Trigger to quickly filter the data by those specific categories. (7)
The execution is always processed asynchronously through a queue. The experiment() and run() methods return an ExecutionLog immediately, but the pipeline may not yet be completed. To wait for the result, use the await_execution() method:
execution = wizata_dsapi.api().run(
pipeline='my_pipeline',
twin='my_twin'
)
# wait for the execution to complete (default timeout: 180s)
result = wizata_dsapi.api().await_execution(execution)
print(result.status)You can customize the timeout and polling interval:
result = wizata_dsapi.api().await_execution(
execution,
timeout=300, # max seconds to wait
poll_interval=10 # seconds between status checks
)await_execution() raises a TimeoutError if the execution does not complete within the timeout, and a RuntimeError if the execution finishes with a failed or aborted status.
If you hover under the Warnings column response, you can find more information about the failed executions. Additionally, you can click on the eye icon to see a JSON formatted text with all the execution details, including the status and the warning variables.
If you are producing plots, you can retrieve them with the following method:
plots = wizata_dsapi.api().plots(execution)Search Execution Logs
You can search execution logs using the search_execution() method. It accepts a SearchQuery object (or its dictionary equivalent) to filter, sort and paginate results.
The following example retrieves execution logs from the last 7 days for a specific pipeline:
from datetime import datetime, timedelta, timezone
import wizata_dsapi
now = datetime.now(timezone.utc)
one_week_ago = now - timedelta(days=7)
now_ms = int(now.timestamp() * 1000)
one_week_ago_ms = int(one_week_ago.timestamp() * 1000)
results = wizata_dsapi.api().search_execution({
"filters": {
"pipeline": {"eq": "my_pipeline_key"},
"startedDate": {"gte": one_week_ago_ms, "lte": now_ms}
},
"sort": [{"field": "queuedDate", "order": "desc"}],
"page": 1,
"size": 20,
"serviceName": "pipeline_runner"
})
for log in results.results:
print(log.execution_id, log.status, log.execution_time)The method returns a PagedQueryResult containing the matching ExecutionLog entries and pagination metadata.
If there are more results than the page size, you can iterate through all pages using the total attribute:
all_logs = []
page = 1
size = 100
now = datetime.now(timezone.utc)
one_day_ago = now - timedelta(days=1)
now_ms = int(now.timestamp() * 1000)
one_day_ago_ms = int(one_day_ago.timestamp() * 1000)
while True:
results = wizata_dsapi.api().search_execution({
"filters": {
"pipeline": {"eq": "my_pipeline_key"},
"startedDate": {"gte": one_day_ago_ms, "lte": now_ms}
},
"sort": [{"field": "queuedDate", "order": "desc"}],
"page": page,
"size": size,
"serviceName": "pipeline_runner"
})
all_logs.extend(results.results)
if page * size >= results.total:
break
page += 1
print(f"Retrieved {len(all_logs)} execution logs out of {results.total}")You can also perform a free-text search across execution log messages using the _search filter:
results = wizata_dsapi.api().search_execution({
"filters": {
"_search": {"query": "my search text"},
"pipeline": {"eq": "my_pipeline_key"},
"startedDate": {"gte": one_day_ago_ms, "lte": now_ms}
},
"sort": [{"field": "queuedDate", "order": "desc"}],
"page": 1,
"size": 20,
"serviceName": "pipeline_runner"
})Updated 26 days ago