C++ API

Create a new query

  auto stream = Query::from("StreamName");

Filter

  stream.filter(Attribute("f1") > 10);
  stream.filter(Attribute("f1") > 10 && Attribute("f2") < 10));

Projection

  stream.project(Attribute("f1"), Attribute("f2"));
 
  // projection with field rename
  stream.project(Attribute("f1").as("f1_new"));

Map

  stream.map(Attribute("f1") = Attribute("f1") + 1);
  stream.map(Attribute("f1")++);
  stream.map(Attribute("new_field") = Attribute("f1") * Attribute("f2"));
  stream.map(Attribute("new_field") = POWER(Attribute("f1"), Attribute("f2"))); // raises f1 to the power of f2
  stream.map(Attribute("new_field") = Attribute("f1") % Attribute("f2")); // modulo operator

WatermarkAssigner

  // Assign the watermark from the watermark_ts field with 10ms of allowed lateness
  stream.assignWatermark(EventTimeWatermarkStrategyDescriptor::create(Attribute("ts"), Milliseconds(millisecondOfallowedLateness), Milliseconds()));
 
  // Assign the watermark using ingestion time
  stream.assignWatermark(IngestionTimeWatermarkStrategyDescriptor::create());

Window

Windows divide an infinite data stream into finite data sets. This allows us to compute aggregations over finite data sets.

Types of Windows

First, when creating a window you have to decide on the type of window you want to use. NES supports two types of windows: tumbling and sliding windows.

  // General Syntax of a Window 
  stream.window(WindowType)
       [.byKey(KeyedAttribute)]       // optional, see Keyed Windows and Global Windows
        .apply( AggregationFunction
       [->as(NameOfAggregatedColumn)] ) // optional   
Tumbling Window

The following code snippet shows an event-time tumbling window with the size of 10 seconds. We also support Milliseconds and Minutes as window sizes.

  // event-time based tumbling window
  stream.window(TumblingWindow::of(EventTime(Attribute("timestamp")), Seconds(10)))
       [.byKey(KeyedAttribute)]
        .apply(AggregationFunction
       [->as(NameOfAggregatedColumn)] )

This tumbling window has a fixed size of 10s. First, the window collects tuples for 10s. Then, after the 10s are over it computes the aggregation and outputs the result to the next operator (e.g., sink). Finally, it discards all tuples inside the window and starts all over again with collecting tuples for 10s and applying the aggregation function. This window does not overlap.

Sliding Window

The following code snippet shows an event-time sliding window with a size of 10ms and a slide size of 5ms. Minutes and Seconds are also possible here.

  // event-time based sliding window
  stream.window(SlidingWindow::of(EventTime(Attribute("timestamp")), Milliseconds(10), Milliseconds(5)))
       [.byKey(KeyedAttribute)]
        .apply(AggregationFunction
       [->as(NameOfAggregatedColumn)] )

This sliding window has a fixed size of 10ms and slides every 5ms. This means every 5ms the aggregation function is triggered to compute the aggregation over the last 10ms. As a result, this type of window overlaps.

Keyed Windows and Global Windows

  // General Syntax of a Keyed Window
  stream.window(WindowType)
        .byKey(KeyedAttribute)
        .apply(AggregationFunction
       [->as(NameOfAggregatedColumn)])
  // General Syntax of a Global Window
  stream.window(WindowType)
        .apply(AggregationFunction
       [->as(NameOfAggregatedColumn)])

The main difference between global windows is the usage of the byKey() function. Using byKey() will result in splitting the data stream into multiple keyed data streams. The key can be any attribute of the data stream. For example, let us say we have a data stream called “vehicle”, which collects data from all vehicles and has the following attributes: id, vehicle_type, speed. In vehicle_type we have “cars”, “bus”, and “bikes”. A keyed window on the attribute vehicle_type would now create three data streams: one where vehicle_type is “car”, another one where vehicle_type is “bus” and the third one with only bikes. These three data streams can then compute the aggregation function in parallel.

Currently, we support keys of the following data types:

  • integer
  • float
  • double
  • char
  • bool
  • Array of char

In contrast, a global window would not split the data stream into keyed data streams.

The following code snippets show a keyed and global tumbling and sliding window.

Example: Keyed Windows
  // Tumbling Window
  stream.window(TumblingWindow::of(EventTime(Attribute("timestamp"))
        .byKey(Attribute("f2"))
        .apply(Sum(Attribute("f2")));
 
  // Sliding window 
  stream.window(SlidingWindow::of(EventTime(Attribute("timestamp")), Milliseconds(10), Milliseconds(5)))
        .byKey(Attribute("f2"))
        .apply(Sum(Attribute("f2")));
Example: Global Windows
  // Tumbling Window
  stream.window(TumblingWindow::of(EventTime(Attribute("timestamp"))
        .apply(Sum(Attribute("f2")));
 
  // Sliding window with window size of 10 ms and slide size 5 ms 
  stream.window(SlidingWindow::of(EventTime(Attribute("timestamp")), Milliseconds(10), Milliseconds(5)))
        .apply(Sum(Attribute("f2")));
 

Aggregation Functions

The following aggregations are supported by NES:

  • Sum
  • Min
  • Max
  • Count
  • Average
  • Median

If you want to give the column, which results from applying the aggregation, a specific name you need to append as() to the query. However, this currently does not work in NES, so please do not rename it. The default name for the column is “AggregationFunction_Attribute”. So if you used a sum over the attribute “f2” the column is called “sum_f2”.

The following code snippet shows how to use the different aggregation functions with a keyed tumbling window.

  // Sum
  stream.window(TumblingWindow::of(EventTime(Attribute("timestamp")), Seconds(10)))
        .byKey(Attribute("f2"))
        .apply(Sum(Attribute("f2"))->as(Attribute("sum_f2")));
 
  // Min
  stream.window(TumblingWindow::of(EventTime(Attribute("timestamp")), Seconds(10)))
        .byKey(Attribute("f2"))
        .apply(Min(Attribute("f2"))->as(Attribute("min_f2")));
 
  // Max
  stream.window(TumblingWindow::of(EventTime(Attribute("timestamp")), Seconds(10)))
        .byKey(Attribute("f2"))
        .apply(Max(Attribute("f2"))->as(Attribute("max_f2")));
 
  // Count
  stream.window(TumblingWindow::of(EventTime(Attribute("timestamp")), Seconds(10)))
        .byKey(Attribute("f2"))
        .apply(Count()->as(Attribute("count_f2")));
 
  // Avg (Average)
  stream.window(TumblingWindow::of(EventTime(Attribute("timestamp")), Seconds(10)))
        .byKey(Attribute("f2"))
        .apply(Avg(Attribute("f2"))->as(Attribute("avg_f2")));
 
  // Median
  stream.window(TumblingWindow::of(EventTime(Attribute("timestamp")), Seconds(10)))
        .byKey(Attribute("f2"))
        .apply(Median(Attribute("f2"))->as(Attribute("median_f2")));

`not implemented`

  // Multiple Aggregation Functions
  stream.window(TumblingWindow::of(Seconds(10)), Sum(Attribute("f2"), Min(Attribute("f2"));
 
  // Count Window
  stream.window(TumblingWindow::of(Count(10))).byKey(Attribute("f2")).apply(Sum(Attribute("f2")));

Join

Join works similar to SQL's JOIN with a WHERE clause. After you have joined two data streams you can apply any other operator you want (in this case we have used a tumbling window).

  auto purchases = Query::from("purchases");
  auto tweets = Query::from("tweets");
  auto message = purchases.joinWith(tweets)
                          .where(Attribute("purchases$user_id"))
                          .equalsTo(Attribute("tweets$user_id"))
                          .window(TumblingWindow::of(EventTime(Attribute("timestamp")), Seconds(10)));

Union

Union works like SQL's UNION. This means, the schema of both data streams, which are combined, has to be the same. Here, we had to apply a projection first so that both cars and bikes have the same schema before using unionWith.

  auto cars = Query::from("cars").project(Attribute("f1"));
  auto bikes = Query::from("bikes").project(Attribute("f1"));
  auto vehicles = cars.unionWith(bikes);

Sink

  stream.sink(FileSinkDescriptor::create("PATH"));
 
  stream.sink(ZMQSinkDescriptor::create("Host", 8081)); //host, port
  stream.sink(KAFKASinkDescriptor::create("Topic", "Brokers", 10000)); //topic, brokers, timeout
  stream.sink(PrintSinkDescriptor::create());

Define in Catalog `not implemented`

  auto nesEnvironment = NESEnvironment::createLocal();
  auto nesEnvironment = NESEnvironment::connect(host, port);

ExecuteQuery `not implemented`

  nesEnvironment->execute(stream);

Define in Catalog `not implemented`

  nesEnvironment->registerInCatalog(stream, "NewLogicalStream");

Complex Event Processing Operators

Unary Operators

  • Iteration (times): allows an event to occur multiple times, can be restricted by minimal occurrences N and maximal occurrences M, with N, M > 0
  • Note: Currently only the minimal number of occurrences are considered by the current implementation
  stream->filter(Attribute("f1") > 10 && Attribute("f2") < 10)).times();
 
  stream->filter(Attribute("f1") > 10 && Attribute("f2") < 10)).times(2,10);

not yet implemented

  • Negation (NOT): requires the absence of event occurrence
  stream->NOT(stream) //TODO

Binary Operators (Event Connectors)

Operators that compose multiple streams into a complex pattern are:

  • AND Operator (AND): combines multiple streams but the order of detect events is irrelevant
  auto trafficBan = Query::From("cars").filter(Attribute("speed") < 50 && Attribute("count") > 50))
                                       .andWith(Query::From("airQuality").filter(Attribute("pollution") > 30))
                                       .window(TumblingWindow::of(EventTime(Attribute("timestamp")), Seconds(10)));; 

Pattern Specification Language

Pattern(trafficBan)

AND(cars, airQuality)
                  
WHERE cars.speed < 50 AND cars.count > 50 AND airQuality.pollution > 30
  • Sequence Operator (SEQ): combines multiple streams and requires exact order of events to accept tuple(s), i.e., SEQ(A,B) requires B to occur after A
  auto trafficBan = Query::From("cars").filter(Attribute("speed") < 50 && Attribute("count") > 50))
                                       .seqWith(Query::From("airQuality").filter(Attribute("pollution") > 30))
                                       .window(TumblingWindow::of(EventTime(Attribute("timestamp")), Seconds(10)));; 
  • OR Operator (OR): combines multiple streams and expects at least one stream to match its conditions
  auto trafficBan = Query::From("cars").filter(Attribute("speed") < 50 && Attribute("count") > 50))
                                       .orWith(Query::From("airQuality").filter(Attribute("pollution") > 30)); 

Nested Patterns

not yet implemented

Further, to create complex queries operators can be nested:

Pattern Specification Language

Pattern(test)

SEQ(A,AND(B,C))
                  
WHERE A.val < x AND B.val > y AND C.val = z
 
query_api.txt · Last modified: 2021/12/13 09:25 by 134.96.191.189
 
Recent changes RSS feed Creative Commons License Donate Powered by PHP Valid XHTML 1.0 Valid CSS Driven by DokuWiki