General Concepts:
Components:
Features:
Configuration:
HowTo's:
—-
API's:
Client Applications:
Testing:
Functionality:
Table of ContentsC++ APICreate a new queryauto stream = Query::from("StreamName"); Filterstream.filter(Attribute("f1") > 10); stream.filter(Attribute("f1") > 10 && Attribute("f2") < 10)); Projectionstream.project(Attribute("f1"), Attribute("f2")); // projection with field rename stream.project(Attribute("f1").as("f1_new")); Mapstream.map(Attribute("f1") = Attribute("f1") + 1); stream.map(Attribute("f1")++); stream.map(Attribute("new_field") = Attribute("f1") * Attribute("f2")); stream.map(Attribute("new_field") = POWER(Attribute("f1"), Attribute("f2"))); // raises f1 to the power of f2 stream.map(Attribute("new_field") = Attribute("f1") % Attribute("f2")); // modulo operator WatermarkAssigner// Assign the watermark from the watermark_ts field with 10ms of allowed lateness stream.assignWatermark(EventTimeWatermarkStrategyDescriptor::create(Attribute("ts"), Milliseconds(millisecondOfallowedLateness), Milliseconds())); // Assign the watermark using ingestion time stream.assignWatermark(IngestionTimeWatermarkStrategyDescriptor::create()); WindowWindows divide an infinite data stream into finite data sets. This allows us to compute aggregations over finite data sets. Types of WindowsFirst, when creating a window you have to decide on the type of window you want to use. NES supports two types of windows: tumbling and sliding windows. // General Syntax of a Window stream.window(WindowType) [.byKey(KeyedAttribute)] // optional, see Keyed Windows and Global Windows .apply( AggregationFunction [->as(NameOfAggregatedColumn)] ) // optional Tumbling WindowThe following code snippet shows an event-time tumbling window with the size of 10 seconds. We also support Milliseconds and Minutes as window sizes. // event-time based tumbling window stream.window(TumblingWindow::of(EventTime(Attribute("timestamp")), Seconds(10))) [.byKey(KeyedAttribute)] .apply(AggregationFunction [->as(NameOfAggregatedColumn)] ) This tumbling window has a fixed size of 10s. First, the window collects tuples for 10s. Then, after the 10s are over it computes the aggregation and outputs the result to the next operator (e.g., sink). Finally, it discards all tuples inside the window and starts all over again with collecting tuples for 10s and applying the aggregation function. This window does not overlap. Sliding WindowThe following code snippet shows an event-time sliding window with a size of 10ms and a slide size of 5ms. Minutes and Seconds are also possible here. // event-time based sliding window stream.window(SlidingWindow::of(EventTime(Attribute("timestamp")), Milliseconds(10), Milliseconds(5))) [.byKey(KeyedAttribute)] .apply(AggregationFunction [->as(NameOfAggregatedColumn)] ) This sliding window has a fixed size of 10ms and slides every 5ms. This means every 5ms the aggregation function is triggered to compute the aggregation over the last 10ms. As a result, this type of window overlaps. Keyed Windows and Global Windows// General Syntax of a Keyed Window stream.window(WindowType) .byKey(KeyedAttribute) .apply(AggregationFunction [->as(NameOfAggregatedColumn)]) // General Syntax of a Global Window stream.window(WindowType) .apply(AggregationFunction [->as(NameOfAggregatedColumn)]) The main difference between global windows is the usage of the byKey() function. Using byKey() will result in splitting the data stream into multiple keyed data streams. The key can be any attribute of the data stream. For example, let us say we have a data stream called “vehicle”, which collects data from all vehicles and has the following attributes: id, vehicle_type, speed. In vehicle_type we have “cars”, “bus”, and “bikes”. A keyed window on the attribute vehicle_type would now create three data streams: one where vehicle_type is “car”, another one where vehicle_type is “bus” and the third one with only bikes. These three data streams can then compute the aggregation function in parallel. Currently, we support keys of the following data types:
In contrast, a global window would not split the data stream into keyed data streams. The following code snippets show a keyed and global tumbling and sliding window. Example: Keyed Windows// Tumbling Window stream.window(TumblingWindow::of(EventTime(Attribute("timestamp")) .byKey(Attribute("f2")) .apply(Sum(Attribute("f2"))); // Sliding window stream.window(SlidingWindow::of(EventTime(Attribute("timestamp")), Milliseconds(10), Milliseconds(5))) .byKey(Attribute("f2")) .apply(Sum(Attribute("f2"))); Example: Global Windows// Tumbling Window stream.window(TumblingWindow::of(EventTime(Attribute("timestamp")) .apply(Sum(Attribute("f2"))); // Sliding window with window size of 10 ms and slide size 5 ms stream.window(SlidingWindow::of(EventTime(Attribute("timestamp")), Milliseconds(10), Milliseconds(5))) .apply(Sum(Attribute("f2"))); Aggregation FunctionsThe following aggregations are supported by NES:
If you want to give the column, which results from applying the aggregation, a specific name you need to append as() to the query. However, this currently does not work in NES, so please do not rename it. The default name for the column is “AggregationFunction_Attribute”. So if you used a sum over the attribute “f2” the column is called “sum_f2”. The following code snippet shows how to use the different aggregation functions with a keyed tumbling window. // Sum stream.window(TumblingWindow::of(EventTime(Attribute("timestamp")), Seconds(10))) .byKey(Attribute("f2")) .apply(Sum(Attribute("f2"))->as(Attribute("sum_f2"))); // Min stream.window(TumblingWindow::of(EventTime(Attribute("timestamp")), Seconds(10))) .byKey(Attribute("f2")) .apply(Min(Attribute("f2"))->as(Attribute("min_f2"))); // Max stream.window(TumblingWindow::of(EventTime(Attribute("timestamp")), Seconds(10))) .byKey(Attribute("f2")) .apply(Max(Attribute("f2"))->as(Attribute("max_f2"))); // Count stream.window(TumblingWindow::of(EventTime(Attribute("timestamp")), Seconds(10))) .byKey(Attribute("f2")) .apply(Count()->as(Attribute("count_f2"))); // Avg (Average) stream.window(TumblingWindow::of(EventTime(Attribute("timestamp")), Seconds(10))) .byKey(Attribute("f2")) .apply(Avg(Attribute("f2"))->as(Attribute("avg_f2"))); // Median stream.window(TumblingWindow::of(EventTime(Attribute("timestamp")), Seconds(10))) .byKey(Attribute("f2")) .apply(Median(Attribute("f2"))->as(Attribute("median_f2"))); `not implemented` // Multiple Aggregation Functions stream.window(TumblingWindow::of(Seconds(10)), Sum(Attribute("f2"), Min(Attribute("f2")); // Count Window stream.window(TumblingWindow::of(Count(10))).byKey(Attribute("f2")).apply(Sum(Attribute("f2"))); JoinJoin works similar to SQL's JOIN with a WHERE clause. After you have joined two data streams you can apply any other operator you want (in this case we have used a tumbling window). auto purchases = Query::from("purchases"); auto tweets = Query::from("tweets"); auto message = purchases.joinWith(tweets) .where(Attribute("purchases$user_id")) .equalsTo(Attribute("tweets$user_id")) .window(TumblingWindow::of(EventTime(Attribute("timestamp")), Seconds(10))); UnionUnion works like SQL's UNION. This means, the schema of both data streams, which are combined, has to be the same. Here, we had to apply a projection first so that both cars and bikes have the same schema before using unionWith. auto cars = Query::from("cars").project(Attribute("f1")); auto bikes = Query::from("bikes").project(Attribute("f1")); auto vehicles = cars.unionWith(bikes); Sinkstream.sink(FileSinkDescriptor::create("PATH")); stream.sink(ZMQSinkDescriptor::create("Host", 8081)); //host, port stream.sink(KAFKASinkDescriptor::create("Topic", "Brokers", 10000)); //topic, brokers, timeout stream.sink(PrintSinkDescriptor::create()); Define in Catalog `not implemented`auto nesEnvironment = NESEnvironment::createLocal(); auto nesEnvironment = NESEnvironment::connect(host, port); ExecuteQuery `not implemented`nesEnvironment->execute(stream); Define in Catalog `not implemented`nesEnvironment->registerInCatalog(stream, "NewLogicalStream"); Complex Event Processing OperatorsUnary Operators
stream->filter(Attribute("f1") > 10 && Attribute("f2") < 10)).times(); stream->filter(Attribute("f1") > 10 && Attribute("f2") < 10)).times(2,10); not yet implemented
stream->NOT(stream) //TODO Binary Operators (Event Connectors)Operators that compose multiple streams into a complex pattern are:
auto trafficBan = Query::From("cars").filter(Attribute("speed") < 50 && Attribute("count") > 50)) .andWith(Query::From("airQuality").filter(Attribute("pollution") > 30)) .window(TumblingWindow::of(EventTime(Attribute("timestamp")), Seconds(10)));; Pattern Specification Language Pattern(trafficBan) AND(cars, airQuality) WHERE cars.speed < 50 AND cars.count > 50 AND airQuality.pollution > 30
auto trafficBan = Query::From("cars").filter(Attribute("speed") < 50 && Attribute("count") > 50)) .seqWith(Query::From("airQuality").filter(Attribute("pollution") > 30)) .window(TumblingWindow::of(EventTime(Attribute("timestamp")), Seconds(10)));;
auto trafficBan = Query::From("cars").filter(Attribute("speed") < 50 && Attribute("count") > 50)) .orWith(Query::From("airQuality").filter(Attribute("pollution") > 30)); Nested Patternsnot yet implemented Further, to create complex queries operators can be nested: Pattern Specification Language Pattern(test) SEQ(A,AND(B,C)) WHERE A.val < x AND B.val > y AND C.val = z |