Using Edge to connect your data

You can use our Edge to connect your network and data stream to Wizata, it contains pre-defined connectors for various protocols.

Prerequisite - Configure and setup a device

To start you need to install and configure an edge device, it can be deployed on-premises or in the cloud, you can also deployed multiple edge device to suit your architectural needs.

Once your edge hardware or virtual machine is installed you need to register and set it up

Consumers & Writers

The edge is configured by defining where data are read (consumers) and where it is send back (writers).

By default, the edge is capable to:

  • write data to its local time-series database and to the cloud platform.
  • read data as results of its internal AI/ML Pipeline Engine.

Configuration of addtional consumers, writers as well as the triggers are defined as a JSON structure and synchronised from cloud to edge regularly.

Additional consumers and writers could be configured to add new source and destination for the data flow.


OPC-UA

You can configure an OPC-UA consumer to pull data from your OPC-UA server in polling mode (default) or subscription mode (server pushes on data change).

Consumer

Common configuration keys:

  • id can be anything but unique on the consumers list
  • type must be opc-ua
  • opc_server is the address of your server (e.g. opc.tcp://x.x.x.x:4840/myopc/...)
  • opc_username to set a user name (optional)
  • opc_password to set a password (optional)
  • opc_application_uri can be used to set an application URI
  • opc_security_string can be used to set a security string
  • opc_session_timeout_ms (optional, default 60000) — OPC-UA session timeout in ms. Orphan sessions on the server expire after this delay.
  • mode (optional, default "polling") — "polling" or "subscription". Determines how data is consumed (see below).
  • opc_nodes lists each node you want to consume:
    • node_ids list of ids (e.g. ["ns=3;i=1"])
    • children true to also consume sub-nodes recursively
    • poll_interval frequency in ms (polling mode only)
    • Polling-mode batching (optional):
      • batch_size — nodes per batched OPC Read service call. If unset, each node is read individually in parallel (legacy behaviour).
      • max_concurrent_batches (default = number of nodes in the group) — caps how many batches run in parallel.
    • Subscription-mode keys (optional):
      • publishing_interval (ms, default 1000) — server-to-client publish cadence.
      • sampling_interval (ms, default 0 = server default) — server-side sampling per item.
      • queue_size (default 1) — server-side queue per monitored item.
  • prefix is an optional key to prefix all OPC-UA tag names (e.g. a factory or area name).
  • reconnect_interval base reconnect delay in ms when the connection drops (default 5000). Backoff grows exponentially with jitter, capped at 60 s, and never gives up — the consumer keeps retrying until the server returns or the consumer is stopped.
  • name_source (optional, default "name") — controls what the consumer reads as the raw tag name from each OPC node, before prefix and mapping are applied.
    • "name" (default) — use the node's displayName attribute.
    • "identifier" — use the OPC NodeId identifier verbatim (the part after the namespace/type wrapper).
  • mapping optional dictionary remapping a raw tag name to a Wizata hardware id. The key is matched against the same value that name_source resolves (displayName by default, or the node identifier when name_source is "identifier"). When mapping matches, prefix is not applied — the mapping value is taken as the complete hardware id.

Each opc_nodes entry runs as a dedicated routine and is consumed independently — different groups can use different poll_intervals in polling mode, or different publishing_intervals in subscription mode.

Polling mode (default)

Reads each group's nodes at the configured poll_interval. Stateless on the server side — best fit for unreliable or minimal-state OPC servers.

By default each node is read individually in parallel. For servers that struggle with many simultaneous reads, use batch_size + max_concurrent_batches to send a single batched OPC Read per chunk and cap how many batches run at once.

{
  "id": "opc-server-01",
  "type": "opc-ua",
  "opc_server": "opc.tcp://0.0.0.0:4840/freeopcua/server/",
  "mode": "polling",
  "opc_nodes": [
    {
      "node_ids": ["ns=3;i=1"],
      "children": true,
      "poll_interval": 1000
    },
    {
      "node_ids": ["ns=4;i=1"],
      "children": true,
      "poll_interval": 2000,
      "batch_size": 20,
      "max_concurrent_batches": 1
    }
  ],
  "prefix": "myprefix_",
  "reconnect_interval": 5000
}

Subscription mode

The OPC-UA server samples each item at sampling_interval and pushes notifications to the client every publishing_interval. Lower CPU/bandwidth on healthy servers, especially with slow-changing variables (only changes are reported). Adds a Subscription + N MonitoredItems to the server's session — best fit for healthy servers and large tag counts.

A watchdog probes the secure channel periodically so silent channel drops still trigger the reconnect logic.

{
  "id": "opc-server-01",
  "type": "opc-ua",
  "opc_server": "opc.tcp://0.0.0.0:4840/freeopcua/server/",
  "mode": "subscription",
  "opc_nodes": [
    {
      "node_ids": ["ns=3;i=1"],
      "children": true,
      "publishing_interval": 1000,
      "sampling_interval": 500,
      "queue_size": 1
    }
  ],
  "prefix": "myprefix_",
  "reconnect_interval": 5000
}

Choosing a mode

Polling (default)Subscription
Server CPUPer-batch read each poll_intervalServer samples and pushes only on change
Server stateNone (stateless reads)One Subscription + N MonitoredItems on the session
Slow-changing variablesReads every cycle (overhead)Reports only on change (efficient)
Recovery on disconnectReconnect + resume pollingSubscriptions are recreated on reconnect
Best forUnreliable / minimal-state serversHealthy servers, large tag counts, slow-changing data

If you're unsure, start with polling — it's the safest default. Switch to subscription per server if you need lower load and you've validated stability.

Writers

To write to OPC-UA, configure a writer:

  • id can be anything but unique on the writers list
  • type must be opc-ua
  • opc_server server address (e.g. opc.tcp://x.x.x.x:4840/myopc/...)
  • opc_username / opc_password for authentication (optional)
  • opc_application_uri application URI (optional)
  • opc_security_string security string (optional)
  • opc_session_timeout (optional, default 60000) — OPC-UA session timeout in ms.
  • datapoints dictionary mapping the tag name as emitted by a consumer or pipeline to the OPC node identifier.

The writer keeps a persistent session to the server and reuses it across writes. On a transport failure it reconnects once and retries; persistent failures bubble up so the platform can react.

{
  "id": "opc-server-01",
  "type": "opc-ua",
  "opc_server": "opc.tcp://0.0.0.0:4840/freeopcua/server/",
  "datapoints": {
    "my_tag_01": "ns=1;s=t|opc_tag_name",
    "my_tag_02": "ns=1;i=4"
  }
}

MQTT

You can configure the message client to subscribe to topics on an MQTT broker reachable from the edge.

  • id can be anything but unique on consumers list
  • type must be mqtt
  • host hostname or IPv4 address of the MQTT broker
  • port port of the MQTT broker (default 1883, or 8883 for TLS)
  • tls boolean to enable TLS (default false)
  • username / password credentials, if the broker requires authentication (optional)
  • client_id identifier used by the broker to track this consumer's session (optional — generated by paho when omitted)
  • keepalive interval in seconds between client → broker keepalive pings (optional, default 60)
  • topics list of topics to subscribe to. Each entry is either a string or { "topic": <pattern>, "qos": <0|1|2> } (default qos 1). Wildcards follow the standard MQTT rules: + matches one segment, # matches the rest.
  • payload_format how the payload bytes are handed to the function (default "json"):
    • json — payload is decoded with json.loads before reaching the function. Without a function, the parsed dict is forwarded to the platform as a single message (same default as the Rabbit MQ consumer).
    • raw — payload is forwarded to the function as raw bytes (use this when the broker pushes single floats as ASCII, vendor binary frames, Sparkplug-B, etc.).
  • function if the payload doesn't already match the Wizata message format, point to a custom python function that returns the platform-shaped list of messages. The function signature receives both the topic and the payload: function(topic: str, payload) -> list[dict]. See Connect your data to Wizata for the platform message format.

Here's a sample consumer configuration for MQTT:

{
  "id": "mqtt-01",
  "type": "mqtt",
  "host": "broker.example.com",
  "port": 1883,
  "tls": false,
  "username": "edge-user",
  "password": "xxxxxxxx",
  "client_id": "edge-site42",
  "topics": [
    { "topic": "site42/area1/+/measured_temp", "qos": 1 },
    { "topic": "site42/area1/+/coolant_temp", "qos": 1 }
  ],
  "payload_format": "json",
  "function": {
    "filepath": "functions/mqtt_parser.py",
    "function_name": "parse_mqtt_message"
  }
}

A minimal parse_mqtt_message for the topic shape above (last segment of the topic is the datapoint name, payload is a JSON object holding value and ts):

def parse_mqtt_message(topic, payload):
    return [{
        "Timestamp": payload["ts"],
        "HardwareId": topic.rsplit("/", 1)[-1],
        "SensorValue": payload["value"],
    }]

Modbus

You can configure the message client to pull data from a Modbus server accessible to the edge.

  • id can be anything but unique on consumers list
  • type must be modbus
  • ip_address IPv4 address of the modbus server
  • port port address of the modbus server
  • poll_rate mandatory number of seconds between two polls
  • unit_id unit identifier/slave of your modbus address
  • datapoints dictionnary where the key is the desired datapoint name and content of the following format:
    • address address as integer (converted from hexadecimal value) on the modbus record (starts at 0)
    • quantity number of 16-bit data to fetch (e.g. 2 for a 32-bit data)
    • pack format to use to pack the data (e.g. "<HH" for Little Endian 32-bit data)
    • unpack format to use to unpack the data (e.g. "<f" for a float)

The modbus client use struct.pack and struct.unpack functions of python library struct, please refer to documentation to find the proper type and format https://docs.python.org/3/library/struct.html

Here is an example on how could look like a modbus configuration (reading a 32-bit float, and signed 16-bit integer from respectively address 0 and 2 on little-endian format):

{
  "id": "modbus-01",
  "type": "modbus",
  "ip_address": "10.0.0.1",
  "port": 502,
  "poll_rate": 60,
  "unit_id": 1,
  "datapoints": {
    "dp_modbus_01": {
      "address": 0,
      "quantity": 2,
      "pack": "<HH",
      "unpack": "<f"
    },
    "mdb_test_test2": {
      "address": 2,
      "quantity": 1,
      "pack": "<H",
      "unpack": "<h"
    }
  }
}

Siemens S7 - Snap7

You can configure the message client to pull data from a Siemens S7 server accessible to the edge.

Prior to configure the connector, make sure your Siemens S7 is accessible on a specific ip and port with PUT/GET http call authorized on the PLC.

  • id can be anything but unique on consumers list
  • type must be snap7
  • address IPv4 address of the snap7 server
  • tcp_port port address of the snap7 server
  • rack rack number of the PLC
  • slot slot number of the PLC
  • polls list of polls to make, each poll is a dict and will create a thread that is polling the data.
    • db_number db number to read
    • start_offset offset in bytes on the read
    • size quantity of bytes to read on the db starting on the offset
    • datapoints structure of each tag within the read, list of dict
      • type type of the data, currently supports dword, real, word, bool, int
      • index position on the read bytes array
      • name tag name you desire to push to Wizata

Here's an example of a Snap7 configuration:

{
  "address": "192.168.0.100",
  "tcp_port": 102,
  "rack": 0,
  "slot": 1,
  "frequency_ms": 5000,
  "polls": [
    {
      "db_number": 1,
      "start_offset": 0,
      "size": 64,
      "datapoints": [
        {
          "type": "dword",
          "index": 0,
          "name": "sample_dword"
        },
        {
          "type": "real",
          "index": 4,
          "name": "sample_real"
        }
      ]
    }
  ]
}

Rabbit MQ

You can configure the message client to pull data from an external Rabbit MQ queue.

  • id can be anything but unique on consumers list and type must be rabbitmq
  • Configure your connection to Rabbit MQ:
    • RQ_HOST - defines your server domain name
    • RQ_PORT - defines the port to use
    • RQ_USER / RQ_PASS - defines the user name and password
    • RQ_TLS - optional if you want don't use SSL or a different version than TLS 1.2
    • RQ_VHOST - virtual host name
  • queue - defines the queue to use, if multiple you need to create different consumers.
  • function - if necessary use a custom python function to convert your data to Wizata message format (see Connect your data to Wizata to understand the supported format).

Here's a sample consumer configuration for Rabbit MQ:

{
      "id": "a-unique-id",
      "type": "rabbitmq",
      "RQ_HOST": "server.domain.name",
      "RQ_PORT": 5672,
      "RQ_USER": "your-user",
      "RQ_PASS": "xxxxxxxxx",
      "RQ_TLS": "v1_1",
      "RQ_VHOST": "vhost",
      "queue": "your-queue",
      "function": {
          "filepath": "functions/sample_t.py",
          "function_name": "sample_t"
      }
  }
📘

Troubleshoot note

RQ_PORT number may vary depending on your installation process.

Functions

A custom function can be used to transform your data to Wizata message format. Some connectors such as OPC-UA, Modbus, ... doesn't requires function as the protocol itself embed the data format. Please read Time-series data - format & types to understand more about Wizata formats.

Custom functions can be used with the following connectors:

  • Rabbit MQ

Let's take an example;

  • if your data looks like :
{'t': 1728920878.798966, 'f': 0.6591128532976067, 'h': 'rq-custom-tag-01'}
{'t': 1728920883.817512, 'f': 0.6972545363650098, 'h': 'rq-custom-tag-01'}
{'t': 1728920888.83555, 'f': 0.6168179285479277, 'h': 'rq-custom-tag-01'}
{'t': 1728920893.85878, 'f': 0.5139981833045464, 'h': 'rq-custom-tag-01'}
  • you can use the following code :
def sample_t(payload: dict) -> list:
    from datetime import datetime, timezone

    message = {
        "Timestamp": datetime.fromtimestamp(payload["t"], tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%f") + "+00:00",
        "HardwareId": payload["h"],
        "SensorValue": float(payload["f"])
    }
    return [message]
❗️

Important

The function must have one and only parameter as a dict corresponding to the message received and must returns a list of message. Each message being a dict.