REST APIs

Below we describe the REST APIs available for user to interact with system. The NebulaStream REST API is versioned, with specific versions being queryable by prefixing the url with the version prefix. Prefixes are always of the form v[version_number]. For example, to access version 1 of /foo/bar one would query /v1/nes/foo/bar.

Querying unsupported/non-existing versions will return a 404 error.

There exist several async operations among these APIs, e.g. submit a job. These async calls will return a triggerid to identify the operation later. You need to use the triggerid to query for the status of the operation.

Query

Here we describe the available endpoints used for submitting and interacting with a user query.

Submitting User Query

Submitting user query for execution.

API: /query/execute-query \ Verb: POST \ Response Code: 200 OK

Example:

Request:

{
"userQuery":"Query::from(\"default_logical\").sink(PrintSinkDescriptor::create());",
"strategyName": "BottomUp"
}

Response:

{"queryId": "system_generate_uuid"}

Submitting user query as a protobuf Object:

API: /query/execute-query-ex
Verb: POST
Response Code: 200 OK

Example:

Request: A Protobuf encoded QueryPlan, query String and PlacementStrategy.

Response: {"queryId": "system_generate_uuid"}

Getting Execution Plan

Getting the execution plan for the user query.

API: /query/execution-plan?queryId=<query id> Verb: GET\ Response Code: 200 OK

Example:

Request:

/query/execution-plan?queryId=1

Response:

{
"executionNodes":[{
                  "ScheduledQueries":[{
                                     "queryId":"1",
                                     "querySubPlans":[{
                                                     "operator":"operator",
                                                     "querySubPlanId":"querySubPlanId"
                                                     }]
                                     }],  
                  "executionNodeId":"1",
                  "topologyNodeId":"1",
                  "topologyNodeIpAddress":"topologyNodeIpAddress"
                  }]
}

Get Query plan

Get query plan for the user query.

API: /query/query-plan?queryId=<query id> Verb: GET\ Response Code: 200 OK

Example:

Request:

/query/query-plan?queryId=1

Response:

{
"edges":[{
        "source":"source",
        "target":"target"}],
"nodes":[{
         "id":"id",
         "name":"name",
         "nodeType":"nodeType"
        }]
}

Delete User Query

To stop a user submitted query.

API: /query/stop-query?queryId=<query id> Verb: DELETE\ Response Code: 200 OK

Example:

Request:

/query/stop-query?queryId=1

Response:

{"success": "true"}

Topology

Getting NebulaStream Topology graph

To get the NebulaStream topology graph as JSON.

API: /topology\ Verb: GET\ Response Code: 200 OK

Example:

Response:

{
    "edges":[{
             "source": "source",
             "target": "target"
            }],
    "nodes": [{
             "available_resources": "available_resources",
             "id": "nodeId",
             "ip_address": "node_ip_address"
             }]
}

Add parent child relationship for topology nodes

The parent and child Id can be obtained by finding the IP address in the topology graph and noting the associated nodeId.

API: /topology/addParent \ Verb: POST \ Response Code: 200 OK

Example:

Request:

{
"parentId":"1",
"childId": "2"
}

Response:

{"Success": "true"}

Remove parent child relationship for topology nodes

The parent and child Id can be obtained by finding the IP address in the topology graph and noting the associated nodeId.

API: /topology/removeParent \ Verb: POST \ Response Code: 200 OK

Example:

Request:

{
"parentId":"1",
"childId": "2"
}

Response:

{"Success": "true"}

Monitoring

Getting NebulaStream Monitoring Data - All Nodes

To get the NebulaStream monitoring data as JSON from all nodes in the topology. The first numbers in the JSON dict correspond to the IDs of the nodes in the topology.

API: /monitoring/metrics\ Verb: GET\ Response Code: 200 OK

Example:

Response:

{
    "1": {
        "cpu": {
            "CORE_0": {
                "GUEST": 0,
                "GUESTNICE": 0,
                "IDLE": 2670438,
                "IOWAIT": 3211,
                "IRQ": 0,
                "NICE": 38,
                "SOFTIRQ": 65181,
                "STEAL": 0,
                "SYSTEM": 38856,
                "USER": 180144
            },
            "CORE_1": {
                "GUEST": 0,
                "GUESTNICE": 0,
                "IDLE": 2671470,
                "IOWAIT": 2947,
                "IRQ": 0,
                "NICE": 192,
                "SOFTIRQ": 30243,
                "STEAL": 0,
                "SYSTEM": 38958,
                "USER": 180216
            },
            [...]
            "CORE_7": {
                "GUEST": 0,
                "GUESTNICE": 0,
                "IDLE": 2679732,
                "IOWAIT": 3004,
                "IRQ": 0,
                "NICE": 45,
                "SOFTIRQ": 6604,
                "STEAL": 0,
                "SYSTEM": 39230,
                "USER": 172534
            },
            "NUM_CORES": 8,
            "TOTAL": {
                "GUEST": 0,
                "GUESTNICE": 0,
                "IDLE": 21373871,
                "IOWAIT": 24883,
                "IRQ": 0,
                "NICE": 981,
                "SOFTIRQ": 155546,
                "STEAL": 0,
                "SYSTEM": 321284,
                "USER": 1411760
            }
        },
        "disk": {
            "F_BAVAIL": 26742328,
            "F_BFREE": 29946424,
            "F_BLOCKS": 62733020,
            "F_BSIZE": 4096,
            "F_FRSIZE": 4096
        },
        "memory": {
            "BUFFER_RAM": 1684271104,
            "FREE_HIGH": 0,
            "FREE_RAM": 17513349120,
            "FREE_SWAP": 2147479552,
            "LOADS_15MIN": 122336,
            "LOADS_1MIN": 80704,
            "LOADS_5MIN": 146432,
            "MEM_UNIT": 1,
            "PROCS": 1666,
            "SHARED_RAM": 116293632,
            "TOTAL_HIGH": 0,
            "TOTAL_RAM": 41721716736,
            "TOTAL_SWAP": 2147479552
        },
        "network": [
            {
                "R_BYTES": 0,
                "R_COMPRESSED": 0,
                "R_DROP": 0,
                "R_ERRS": 0,
                "R_FIFO": 0,
                "R_FRAME": 0,
                "R_MULTICAST": 0,
                "R_PACKETS": 0,
                "T_BYTES": 0,
                "T_CARRIER": 0,
                "T_COLLS": 0,
                "T_COMPRESSED": 0,
                "T_DROP": 0,
                "T_ERRS": 0,
                "T_FIFO": 0,
                "T_PACKETS": 0
            },
            [...],
            {
                "R_BYTES": 0,
                "R_COMPRESSED": 0,
                "R_DROP": 0,
                "R_ERRS": 0,
                "R_FIFO": 0,
                "R_FRAME": 0,
                "R_MULTICAST": 0,
                "R_PACKETS": 0,
                "T_BYTES": 0,
                "T_CARRIER": 0,
                "T_COLLS": 0,
                "T_COMPRESSED": 0,
                "T_DROP": 0,
                "T_ERRS": 0,
                "T_FIFO": 0,
                "T_PACKETS": 0
            }
        ]
    },
    "2": {
        "cpu": {
            "CORE_0": {
                "GUEST": 0,
                "GUESTNICE": 0,
                "IDLE": 2670438,
                "IOWAIT": 3211,
                "IRQ": 0,
                "NICE": 38,
                "SOFTIRQ": 65181,
                "STEAL": 0,
                "SYSTEM": 38856,
                "USER": 180144
            },
            [...],
            "CORE_7": {
                "GUEST": 0,
                "GUESTNICE": 0,
                "IDLE": 2679732,
                "IOWAIT": 3004,
                "IRQ": 0,
                "NICE": 45,
                "SOFTIRQ": 6604,
                "STEAL": 0,
                "SYSTEM": 39230,
                "USER": 172534
            },
            "NUM_CORES": 8,
            "TOTAL": {
                "GUEST": 0,
                "GUESTNICE": 0,
                "IDLE": 21373871,
                "IOWAIT": 24883,
                "IRQ": 0,
                "NICE": 981,
                "SOFTIRQ": 155546,
                "STEAL": 0,
                "SYSTEM": 321284,
                "USER": 1411760
            }
        },
        "disk": {
            "F_BAVAIL": 26742328,
            "F_BFREE": 29946424,
            "F_BLOCKS": 62733020,
            "F_BSIZE": 4096,
            "F_FRSIZE": 4096
        },
        "memory": {
            "BUFFER_RAM": 1684271104,
            "FREE_HIGH": 0,
            "FREE_RAM": 17513349120,
            "FREE_SWAP": 2147479552,
            "LOADS_15MIN": 122336,
            "LOADS_1MIN": 80704,
            "LOADS_5MIN": 146432,
            "MEM_UNIT": 1,
            "PROCS": 1666,
            "SHARED_RAM": 116293632,
            "TOTAL_HIGH": 0,
            "TOTAL_RAM": 41721716736,
            "TOTAL_SWAP": 2147479552
        },
        "network": [
            {
                "R_BYTES": 0,
                "R_COMPRESSED": 0,
                "R_DROP": 0,
                "R_ERRS": 0,
                "R_FIFO": 0,
                "R_FRAME": 0,
                "R_MULTICAST": 0,
                "R_PACKETS": 0,
                "T_BYTES": 0,
                "T_CARRIER": 0,
                "T_COLLS": 0,
                "T_COMPRESSED": 0,
                "T_DROP": 0,
                "T_ERRS": 0,
                "T_FIFO": 0,
                "T_PACKETS": 0
            },
            [...],
            {
                "R_BYTES": 0,
                "R_COMPRESSED": 0,
                "R_DROP": 0,
                "R_ERRS": 0,
                "R_FIFO": 0,
                "R_FRAME": 0,
                "R_MULTICAST": 0,
                "R_PACKETS": 0,
                "T_BYTES": 0,
                "T_CARRIER": 0,
                "T_COLLS": 0,
                "T_COMPRESSED": 0,
                "T_DROP": 0,
                "T_ERRS": 0,
                "T_FIFO": 0,
                "T_PACKETS": 0
            }
        ]
    }
}

Getting NebulaStream Monitoring Data Via Node-ID

To get the NebulaStream monitoring data for a particular node.

API: /monitoring/metrics/<NODE_ID>\ Verb: GET\ Response Code: 200 OK

Example:

Response:

{
    "cpu": {
        "CORE_0": {
            "GUEST": 0,
            "GUESTNICE": 0,
            "IDLE": 2767270,
            "IOWAIT": 3284,
            "IRQ": 0,
            "NICE": 38,
            "SOFTIRQ": 70028,
            "STEAL": 0,
            "SYSTEM": 41973,
            "USER": 188192
        },
        [...],
        "CORE_7": {
            "GUEST": 0,
            "GUESTNICE": 0,
            "IDLE": 2776496,
            "IOWAIT": 3132,
            "IRQ": 0,
            "NICE": 45,
            "SOFTIRQ": 6642,
            "STEAL": 0,
            "SYSTEM": 42672,
            "USER": 180047
        },
        "NUM_CORES": 8,
        "TOTAL": {
            "GUEST": 0,
            "GUESTNICE": 0,
            "IDLE": 22147970,
            "IOWAIT": 25769,
            "IRQ": 0,
            "NICE": 982,
            "SOFTIRQ": 164849,
            "STEAL": 0,
            "SYSTEM": 345240,
            "USER": 1478098
        }
    },
    "disk": {
        "F_BAVAIL": 26733637,
        "F_BFREE": 29937733,
        "F_BLOCKS": 62733020,
        "F_BSIZE": 4096,
        "F_FRSIZE": 4096
    },
    "memory": {
        "BUFFER_RAM": 1689235456,
        "FREE_HIGH": 0,
        "FREE_RAM": 17270108160,
        "FREE_SWAP": 2147479552,
        "LOADS_15MIN": 90752,
        "LOADS_1MIN": 87328,
        "LOADS_5MIN": 85184,
        "MEM_UNIT": 1,
        "PROCS": 1645,
        "SHARED_RAM": 123133952,
        "TOTAL_HIGH": 0,
        "TOTAL_RAM": 41721716736,
        "TOTAL_SWAP": 2147479552
    },
    "network": [
        {
            "R_BYTES": 0,
            "R_COMPRESSED": 0,
            "R_DROP": 0,
            "R_ERRS": 0,
            "R_FIFO": 0,
            "R_FRAME": 0,
            "R_MULTICAST": 0,
            "R_PACKETS": 0,
            "T_BYTES": 0,
            "T_CARRIER": 0,
            "T_COLLS": 0,
            "T_COMPRESSED": 0,
            "T_DROP": 0,
            "T_ERRS": 0,
            "T_FIFO": 0,
            "T_PACKETS": 0
        },
        [...],
        {
            "R_BYTES": 0,
            "R_COMPRESSED": 0,
            "R_DROP": 0,
            "R_ERRS": 0,
            "R_FIFO": 0,
            "R_FRAME": 0,
            "R_MULTICAST": 0,
            "R_PACKETS": 0,
            "T_BYTES": 0,
            "T_CARRIER": 0,
            "T_COLLS": 0,
            "T_COMPRESSED": 0,
            "T_DROP": 0,
            "T_ERRS": 0,
            "T_FIFO": 0,
            "T_PACKETS": 0
        }
    ]
}

Collected Metrics

CPU

The metrics are read in Linux from /proc/stat/. The units of the CPU metrics are measured is jiffies.

%usr – % CPU usage at the user level
%nice – % CPU usage for user processes labeled “nice”
%sys – % CPU usage at the system (Linux kernel) level
%iowait – % CPU usage idling waiting on a disk read/write
%irq – % CPU usage handling hardware interrupts
%soft – % CPU usage handing software interrupts
%steal – % CPU usage being forced to wait for a hypervisor handling other virtual processors
%guest – % CPU usage spent running a virtual processor
%idle – % CPU usage on idle time (no processes, and not waiting on a disk read/write)

Memory

The metrics are obtained via the <sysinfo> library, which reads underneath /proc/meminfo

loads[3];  /* 1, 5, and 15 minute load averages */
totalram;  /* Total usable main memory size */
freeram;   /* Available memory size */
sharedram; /* Amount of shared memory */
bufferram; /* Memory used by buffers */
totalswap; /* Total swap space size */
freeswap;  /* Swap space still available */
procs;    /* Number of current processes */
totalhigh; /* Total high memory size */
freehigh;  /* Available high memory size */
mem_unit;   /* Memory unit size in bytes */

Disk

The metrics are obtained via the <statvfs> library.

f_bsize;    /* Filesystem block size */
f_frsize;   /* Fragment size */
f_blocks;   /* Size of fs in f_frsize units */
f_bfree;    /* Number of free blocks */
f_bavail;   /* Number of free blocks for

Network

Network metrics are represented in a list for each network interface. The metrics are obtained via /proc/net/dev.

Meaning of the metrics (r)received, (t)transmitted:

bytes The total number of bytes of data transmitted or received by the interface.

packets The total number of packets of data transmitted or received by the interface.

errs The total number of transmit or receive errors detected by the device driver.

drop The total number of packets dropped by the device driver.

fifo The number of FIFO buffer errors.

frame The number of packet framing errors.

colls The number of collisions detected on the interface.

compressed The number of compressed packets transmitted or received by the device driver. (This appears to be unused in the 2.2.15 kernel.)

carrier The number of carrier losses detected by the device driver.

multicast The number of multicast frames transmitted or received by the device driver.

Query Catalog

Here we describe the APIs used for interacting with query catalog.

Getting All Queries

To get all queries registered at NebulaStream.

API: /queryCatalog/allRegisteredQueries\ Verb: GET\ Response Code: 200 OK

Example:

Response:

{[
"system_generated_query_id": "query_string" 
]}

Getting Queries With Status

To get all queries with a specific status form NebulaStream.

API: /queryCatalog/queries?status=<Status> Verb: GET\ Response Code: 200 OK

Example:

Request:

/queryCatalog/queries?status=Running

Response:

{[
"system_generated_query_id": "query_string" 
]}

Source Catalog

Getting All Logical Source

To get all queries registered at NebulaStream.

API: /sourceCatalog/allLogicalSource Verb: GET\ Response Code: 200 OK

Example:

Response:

{[
"logical_source_name": "logical_source_schema" 
]}

Getting All Physical Source For a Logical Source

To get all physical sources for a given logical source.

API: /sourceCatalog/allPhysicalSource?logicalSourceName=<logical source name> Verb: GET\ Response Code: 200 OK

Example:

Request:

 /sourceCatalog/allPhysicalSource?logicalSourceName=Physical Sources

Response:

{"Physical Sources":  [physical_source_string]}

Get Schema of a Logical Source

Retrieve the schema for a logical Source.

API: /sourceCatalog/schema?logicalSourceName=<logical source name> Verb: GET\ Response Code: 200 OK

Example:

Request:

 /sourceCatalog/schema?logicalSourceName=default_logical

Response: A Protobuf encoded schema (SerializableSchema).

Add Logical Source

To add a logical source.

API: /sourceCatalog/addLogicalSource Verb: POST\ Response Code: 200 OK

Example:

Request:

{"logicalSourceName": "logical_source_name",
"schema": "Schema::create()->addField(\"test\",INT32);"}

Response:

{"Success": "true"}

To add a logical source as a protobuf Object:

API: /sourceCatalog/addLogicalSource-ex
Verb: POST
Response Code: 200 OK

Example:

Request: A Protobuf encoded source name and schema.

Response: {"Success": "true"}

Update Logical Source

To Update a logical Source.

API: /sourceCatalog/updateLogicalSource Verb: POST\ Response Code: 200 OK

Example:

Request:

{"logicalSourceName": "logical_source_name",
"schema": "Schema::create()->addField(\"test\",INT32);"}

Response:

{"Success": "true"}

Delete Logical Source

To delete a logical source.

API: /sourceCatalog/deleteLogicalSource?sourceName=<logical source name> Verb: DELETE\ Response Code: 200 OK

Example:

Request:

/sourceCatalog/deleteLogicalSource?logicalSourceName=logical_source_name

Response:

{"Success": "true"}

UDF Catalog

The REST interface provides the the following API to manipulate the UDF catalog.

The API encodes Java UDF descriptors using the following Protobuf message.

message JavaUdfDescriptorMessage {
  message JavaUdfClassDefinition {
    string class_name = 1;
    bytes byte_code = 2;
  }

  string udf_class_name = 1;
  string udf_method_name = 2;
  bytes serialized_instance = 3;
  repeated JavaUdfClassDefinition classes = 4;
}

Register a Java UDF

  • API: /query/registerJavaUdf
  • Verb: POST

Request contents:

A Protobuf message encoding a UDF name and a Java UDF descriptor:

message RegisterJavaUdfRequest {
  string udf_name = 1;
  JavaUdfDescriptorMessage java_udf_descriptor = 2;
}

Status codes:

  • 200 OK, if the UDF could be registered.
  • 400 Bad Request, if the UDF already exists.

Get a Java UDF descriptor

  • API: /query/getUdfDescriptor?udfName=…
  • Verb: GET

Request contents:

A URI with the UDF name passed as the udfName parameter.

Result:

A Protobuf message encoding if the UDF was found and, if it was found, a Java UDF descriptor:

message GetJavaUdfDescriptorResponse {
  bool found = 1;
  optional JavaUdfDescriptorMessage java_udf_descriptor = 2;
}

Status codes:

  • 200 OK, if the UDF was found or not found.
  • 400 Bad Request, if udfName parameter is missing.
  • 400 Bad Request, if the URL contains additional parameters.

Remove UDF

  • API: /query/removeUdf?udfName=…
  • Verb: DELETE

Request contents:

A URI with the UDF name passed as the udfName parameter.

Result:

A JSON object indicating if the UDF was removed. (Note that it is not an error condition if a client tries to remove an unknown UDF because it could have been removed by another client.)

{
  "removed": true
}

Status codes:

  • 200 OK, if the UDF was removed.
  • 200 OK, if the UDF was not found.
  • 400 Bad Request, if udfName parameter is missing.
  • 400 Bad Request, if the URL contains additional parameters.

List UDFs

  • API: /query/listUdfs
  • Verb: GET

Request contents:

Empty.

Result:

A JSON object containing a list of Java UDFs in the UDF catalog.

{
  "udfs": [ "my_udf" ]
}

Status codes:

  • 200

O K

 
rest_api.txt · Last modified: 2022/03/04 17:34 by 134.96.191.189
 
Recent changes RSS feed Creative Commons License Donate Powered by PHP Valid XHTML 1.0 Valid CSS Driven by DokuWiki