Back to top

Dojot data broker API

This document describes the APIs using data-broker as a standalone process. If used within dojot, the endpoints might be changed a bit. Check dojot’s documentation to check all of them.

Subject

In order to allow clients to consume dojot events on their own pace, dojot exposes (both internally and externally) apache kafka streams. To manage the set of topics/partitions exposed by the kafka cluster, the data-broker component functions as a gatekeeper to the queues, configuring the needed streams on kafka and setting its accessibility parameters.

In order to subscribe to a given set of events, a client uses a subject. A subject is a shared human-readable identifier for the data stream one wishes to produce or consume from. From the data-broker point of view, all subjects are equal and are just a representation of a set of events to be put on their own stream. To check the list of available subjects to consume from (as an application) please check the relevant service documentation (e.g. device-manager or iotagent).

Subject Profiles

Subject profiles are used to define how topics will be created when requesting one for a particular subject. For instance, let’s say that current deploy must prioritize data sent by devices (received, processed and published by IoT agents). In order to do this, the system administrator might want to create a profile to device-data subject, which is the one used by IoT agent, adding more partitions to it. Therefore, whenever a new IoT agent requests a topic to DataBroker, it will create a new topic (if it doesn’t exist already) this new configuration. The administrator can set different profiles to different tenants associated to the same subject.

There are two ways to configure a subject profile:

  1. Setting a default value for number of partitions and replication factor. Using this scheme will distribute all partitions evenly to all Kafka brokers in the cluster. The message format is as follows:
    {
        "num_partitions": 4,
        "replication_factor" : 2
    }

This message indicates that the topic will have 4 partitions and each one will have two replicas.

  1. Setting a mapping of partitions and brokers: this scheme allows an administrator to set the broker which will host a particular partitions (first element is the preferred partition leader) and its replicas. The message format is as follows:
    {
        "replica_assignment": {
            "1": [ 1, 2, 3],
            "2": [ 4, 5, 6]
        }
    }

This message indicates that the topic will have 2 partitions. The first partition will be hosted by brokers 1 (preferred partition leader), 2 and 3 and the second one will be hosted by brokers 4 (preferred partition leader), 5 and 6.

This mapping will be used as a suggestion to Kafka brokers. As noted in its documentation, it is not guaranteed that this mapping will be exactly the configured scenario.

The schemes are mutually exclusive. Also, these APIs are accessible only by users in admin tenant (only system administrator can use them).

Default values are one partition per topic, no replication.

Retrieve subject profile
GET/topic/{subjectid}/profile

Retrieve currently configured topic profile for a particular subject. Should any internal error occurs, a 500 status code is returned with a proper error description. These errors could be related to REDIS access or some other service dependency.

Example URI

GET http://localhost:80/topic/devicedata/profile
URI Parameters
HideShow
subjectid
string (required) Example: devicedata

Subject whose topic is to be retrieved.

Request
HideShow
Headers
Authorization: Bearer JWT
Response  200
HideShow
Headers
Content-Type: application/json; charset=utf-8
Body
{
  "special-user": {
    "replica_assignment": {
      "1": [
        1,
        2,
        3
      ],
      "2": [
        4,
        5,
        6
      ]
    }
  },
  "*": {
    "num_partitions": 2,
    "replication_factor": 1
  }
}
Response  401
HideShow
Headers
Content-Type: application/json; charset=utf-8
Body
{
  "error": "Missing mandatory authorization header in profile request"
}
Response  404
HideShow
Headers
Content-Type: application/json; charset=utf-8
Body
{
  "message": "Could not find profiles for this subject"
}
Response  500
HideShow
Headers
Content-Type: application/json; charset=utf-8
Body
{
  "message": "error",
  "details": "internal error"
}

Create a new subject profile
POST/topic/{subjectid}/profile

Example URI

POST http://localhost:80/topic/devicedata/profile
URI Parameters
HideShow
subjectid
string (required) Example: devicedata

Subject whose topic is to be retrieved.

Request
HideShow
Headers
Authorization: Bearer JWT
Body
{
  "special-user": {
    "replica_assignment": {
      "1": [
        1,
        2,
        3
      ],
      "2": [
        4,
        5,
        6
      ]
    }
  },
  "*": {
    "num_partitions": 2,
    "replication_factor": 1
  }
}
Response  200
HideShow
Headers
Content-Type: application/json; charset=utf-8
Body
{
  "message": "Set configs successfully"
}
Response  401
HideShow
Headers
Content-Type: application/json; charset=utf-8
Body
{
  "error": "Missing mandatory authorization header in profile request"
}

Edit Profile

Edit profile config for given tenant
PUT/topic/{subjectid}/profile/{tenant}

Example URI

PUT http://localhost:80/topic/devicedata/profile/special
URI Parameters
HideShow
subjectid
string (required) Example: devicedata

Subject whose profiles is to be retrieved.

tenant
string (required) Example: special

user (required, string) - Tenant that will have configs changed or created

Request
HideShow
Headers
Authorization: Bearer JWT
Body
{
  "special-user": {
    "replica_assignment": {
      "1": [
        1,
        2,
        3
      ],
      "2": [
        4,
        5,
        6
      ],
      "3": [
        7,
        8,
        9
      ]
    }
  }
}
Response  200
HideShow
Headers
Content-Type: application/json; charset=utf-8
Body
{
  "message": "Configs edited/created successfully"
}
Response  401
HideShow
Headers
Content-Type: application/json; charset=utf-8
Body
{
  "error": "Missing mandatory authorization header in profile request"
}

Subject

Request kafka topic for given subject
GET/topic/{subjectid}

Request a kafka topic ID for realtime event consumption

Example URI

GET http://localhost:80/topic/device%2Ddata
URI Parameters
HideShow
subjectid
string (required) Example: device%2Ddata

Subject whose topic is to be retrieved.

Request
HideShow
Headers
Authorization: Bearer JWT
Response  200
HideShow
Headers
Content-Type: application/json; charset=utf-8
Body
{
  "topic": "c9b2c688-9e40-4032-877a-3d262acba9d0"
}
Response  401
HideShow
Headers
Content-Type: application/json; charset=utf-8
Body
{
  "error": "missing mandatory authorization header in get request"
}
Response  500
HideShow
Headers
Content-Type: application/json; charset=utf-8
Body
{
  "error": "failed to process topic"
}

Websockets

For websocket based real time consumption of events, a two-step procedure is required on the client. First, the user requests a session token, using a valid dojot JWT token to present itself to the platform. Should the user be able to access the feature, data-broker will then generate a one-time token to be used by the client when establising the websocket connection, through a querystring param.

Websocket connections that do not present a valid previously agreed one-time token will be dropped.

Socket-io based realtime events

Established connections will have two types of events defined: wildcard (all) and per device.

As a client, to be notified of changes to a specific device’s attribute values, subscribe to the device id of the relevant device:

/*
 * Where:
 *   target is the full base url to dojot
 *   token is the one-time access token retrieved by GET /socketio
 */
var socket = socketio(target, {'query': "token=" + token, 'transports': ['websocket']});
socket.on('{deviceidhere}', (data) => {
    // handle data here
});

To be notified of changes to any device, subscribe to the wildcard message type all:

/*
 * Where:
 *   target is the full base url to dojot
 *   token is the one-time access token retrieved by GET /socketio
 */
var socket = socketio(target, {'query': "token=" + token, 'transports': ['websocket']});
socket.on('all', (data) => {
    // handle data here
});

All messages received through device-data and dojot.notification subjects will be redirected to Socket-io. To check their format and content, check iotagent-mosca documentation.

In order to receive data selectively, the following events can be subscribed:

  • all: all device events from a particular tenant

  • device-id: only events for that particular device.

  • notifications: only notifications are going to be sent. You can sent back to data-broker filters. In order to receive notifications using a socket io connection, a subject=dojot.notifications parameter should be added to the query string used to create the socket.io connection.

Request authentication token
GET/socketio

Request a one-time token for connection establishment.

Example URI

GET http://localhost:80/socketio
Request
HideShow
Headers
Authorization: Bearer JWT
Response  200
HideShow
Headers
Content-Type: application/json; charset=utf-8
Body
{
  "token": "f3fa3200-355e-4c64-b300-64cef69b0576"
}
Response  401
HideShow
Headers
Content-Type: application/json; charset=utf-8
Body
{
  "error": "missing mandatory authorization header in socketio request"
}

Log configuration

Retrieve/request log level changes

Log level configuration [log/config]

Retrieve current log level [GET]

  • Request

    • Headers

      Authorization: Bearer JWT
  • Response 200 (application/json; charset=utf-8)

    {"level":"debug"}

Set current log level [PUT]

  • Request

    • Headers

      Authorization: Bearer JWT
    • Body

      {
            "level": "warn"
        }
  • Response 200 (text/html; charset=utf-8)

    "Level of debugger is set to warn"

Generated by aglio on 04 Sep 2020