Never Duplicate A Window Again - WSO2 Siddhi Event Window

A new feature known as Event Window is introduced in WSO2 Siddhi 3.1.1 version, which is quite similar to the named window of Esper CEP in some aspects. This article presents the application and the architecture of Event Window using a simple example. According to Siddhi version 3.1.0, a window can be defined on a stream inside a query and the output can be used in the same query itself. For example, consider a scenario where WSO2 CEP is used to analyze the sensor reading of a smart home which has multiple sensors in number of rooms. All the sensors are sending their reading to an input stream named SensorStream and there are different analytics components interested in various statistic information.

If there are two analytic components named A and B which are interested in the maximum reading of each sensors in every room in last 5 seconds and the average sensor reading of each type of sensors in the smart home in last 5 seconds respectively, it can be achieved using a query provided below.
define stream SensorStream (name string, value float, roomNo int, deviceID string);

@info(name = 'query0')
from SensorStream#window.timeBatch(1 second)
select name, max(value) as maxValue, roomNo
group by name, roomNo
insert into MaxSensorReadingPerRoomStream;

@info(name = 'query1')
from SensorStream#window.timeBatch(1 second)
select name, max(value) as maxValue, roomNo
group by name, roomNo
insert into AverageSensorReadingPerBuildingStream;
In this definition, the events arrived in last second will be stored in two windows which are identical  to each other. The output is determined only by the selection and grouping. If the time interval is large enough, with thousands of sensors used in a smart house, several megabytes of memory will be wasted by the duplicate events. Also the window defined in a query cannot be reused for any other purpose even inside the same query. To overcome these problems, Event Window is introduced as a global window which can be accessed from any queries any number of times.

The Siddhi queries and the architecture discussed in this article are presented according to the Siddhi version 3.1.1.

Event Window Definition
The event window must be defined before using it in queries. The event window definition contains a unique name, set of attributes assigned to specific types, the window type and optional output event type.

define window <event window name> (<attribute name> <attribute type>, <attribute name> <attribute type>, ... ) <window type>(<parameter>, <parameter>, …) output <output event type>;
event window name A unique name to identify the window.
attribute name A unique name to identify the attribute inside the window.
attribute type Any allowed attribute types (Currently Siddhi allows the following types:  int, long, float, double, string, bool, object).
window type Any window types which are available in Siddhi (See the list of available windows here: Inbuilt Windows).
parameter Required parameters for the window, which are defined by the window type.
output event type This property is used to filter the output events of the window. Currently Siddhi allows the following output event types:
  • current events: Emits the current events only
  • expired events: Emits the expired events only
  • all events: Emits both current and expired events. Used as default output event type if nothing is specified.
A sample EventTable definition is given below:
define window SensorWindow (name string, value float, roomNo int, deviceID string) timeBatch(1 second);

The above window emits both current and expired events as the output, which is identical to the following definition where the output event type is explicitly defined to all events.
define window SensorWindow (name string, value float, roomNo int, deviceID string) timeBatch(1 second) output all events;

To restrict the output events type to current events only, use the following:
define window SensorWindow (name string, value float, roomNo int, deviceID string) timeBatch(1 second) output current events;

To restrict the output events type to expired events only, use the following:
define window SensorWindow (name string, value float, roomNo int, deviceID string) timeBatch(1 second) output expired events;


Input To Event Window
An event window can receive input events from one or more than one streams. Filters and selection can be applied to input stream before inserting into the window. An example of inserting events into the window is given below.
define stream SensorStream (name string, value float, roomNo int, deviceID string);
define stream KitchenSensorStream (name string, value float, deviceID string);
define window SensorWindow (name string, value float, roomNo int, deviceID string) timeBatch(1 second);

@info(name = 'query0')
from SensorStream
insert into SensorWindow;

@info(name = 'query1')
from  KitchenSensorStream
select name, value, 10 as  roomNo, deviceID
insert into SensorWindow;


Output From Event Window
An event window can be used as a stream in any queries. The event window’s output events will be passed to those queries in the same was as any other regular streams. However, note that the output stream of event window can contain expired events and a traditional window cannot be used for the output stream of an event window.
In the following queries, the output of SensorWindow is used for two different calculations.
@info(name = 'query2')
from SensorWindow
select name, max(value) as maxValue, roomNo
group by name, roomNo
insert into MaxSensorReadingPerRoomStream;

The same event window can be used in other queries as well.
@info(name = 'query3')
from  SensorWindow
select name, max(value) as maxValue, roomNo
group by name, roomNo
insert into AverageSensorReadingPerBuildingStream;

NOTE:
Adding window to the output events of an event window is not allowed. Therefore the following query is invalid, and trying to use such implementations will throw ExecutionPlanValidationException.
@info(name = 'invalidQuery')
from  SensorWindow#window.length(2)
insert into OutputStream;

An ordinal window cannot be used on the output stream of an event window, but the output stream of an event window can be inserted into another event window.

Joining Event Windows
An event window can be joined with another event stream, event table or event window. The following section illustrates the behavior of event window join in all these cases.

Joining Event Window with Stream
Joining an event window with a stream works in the same way as joining a stream with another stream. The join will happen whenever there is an event arrived into the stream or whenever there is an event emitted from the window. Example of joining two streams and an equivalent event window join are provided below:

Joining two streams
define stream TempStream(deviceID long, roomNo int, temp double);
define stream RegulatorStream(deviceID long, roomNo int, isOn bool);

@info(name = 'query1')
from TempStream[temp > 30.0]#window.time(1 min)
join RegulatorStream[isOn == false]#window.length(1) as R
on TempStream.roomNo == R.roomNo
select TempStream.roomNo, R.deviceID, 'start' as action
insert into RegulatorActionStream;

An equivalent query where one stream is replaced by event window
define stream TempStream(deviceID long, roomNo int, temp double);
define stream RegulatorStream(deviceID long, roomNo int, isOn bool);
define window TempWindow(deviceID long, roomNo int, temp double) time(1 min);

@info(name = 'query1')
from TempStream[temp > 30.0]
insert into TempWindow;

@info(name = 'query2')
from TempWindow
join RegulatorStream[isOn == false]#window.length(1) as R
on TempWindow.roomNo == R.roomNo
select TempWindow.roomNo, R.deviceID, 'start' as action
insert into RegulatorActionStream;


Joining Event Window with Event Table
In the previous versions, an event table can be joined with another stream only. From the introduction of event window, an event table can be joined with an event window as well. All the events emitted from event window joins with the event table and produce the output. Example of joining a stream with an event table and an equivalent event window join are given below:

Joining an event stream window with an event table
define stream StockStream (symbol string, price float, volume long);
define stream CheckStockStream (symbol string);
define table StockTable (symbol string, price float, volume long);

@info(name = 'query0')
from StockStream
insert into StockTable ;

@info(name = 'query1')
from CheckStockStream#window.lengthBatch(2) join StockTable
on CheckStockStream.symbol==StockTable.symbol
select CheckStockStream.symbol as checkSymbol, StockTable.volume as volume
insert into OutputStream;

An equivalent query where the stream with window is replaced by event window
define stream StockStream (symbol string, price float, volume long);
define stream CheckStockStream (symbol string);
define window CheckStockWindow(symbol string) lengthBatch(2);
define table StockTable (symbol string, price float, volume long);

@info(name = 'query0')
from StockStream
insert into StockTable;

@info(name = 'query1')
from CheckStockStream
insert into CheckStockWindow;

@info(name = 'query2')
from CheckStockWindow join StockTable
on CheckStockWindow.symbol==StockTable.symbol
select CheckStockWindow.symbol as checkSymbol, StockTable.volume as volume
insert into OutputStream;


Joining Event Window with Event Window
Same as joining two event streams, two event windows can be joined. When joining an event window with another event window, the output events of one window is matched with the event chunk of the other window. Example of joining two streams and an equivalent event window join are given below:

Joining two streams
define stream TempStream(deviceID long, roomNo int, temp double);
define stream RegulatorStream(deviceID long, roomNo int, isOn bool);

@info(name = 'query1')
from TempStream[temp > 30.0]#window.time(1 min)
join RegulatorStream[isOn == false]#window.length(1) as R
on TempStream.roomNo == R.roomNo
select TempStream.roomNo, R.deviceID, 'start' as action
insert into RegulatorActionStream;

An equivalent query where both streams are replaced by event window
define stream TempStream(deviceID long, roomNo int, temp double);
define stream RegulatorStream(deviceID long, roomNo int, isOn bool);
define window TempWindow(deviceID long, roomNo int, temp double) time(1 min);
define window RegulatorWindow(deviceID long, roomNo int, isOn bool) length(1);

@info(name = 'query1')
from TempStream[temp > 30.0]
insert into TempWindow;

@info(name = 'query2')
from RegulatorStream[isOn == false]
insert into RegulatorWindow;

@info(name = 'query3')
from TempWindow
join RegulatorWindow as R
on TempWindow.roomNo == R.roomNo
select TempWindow.roomNo, R.deviceID, 'start' as action
insert into RegulatorActionStream;

Joining an event window with itself is also possible and there were no equivalent implementations in previous Siddhi versions. According to the previous join implementations, two window instances are required to perform a join, but when joining an event window with itself, only one window is used.
define stream CSEEventStream (symbol string, price float, volume int);
define window CSEEventWindow (symbol string, price float, volume int) length(2);

@info(name = 'query0')
from  CSEEventStream
insert into
CSEEventWindow;

@info(name = 'query1')
from CSEEventWindow as a
join CSEEventWindow as b
on a.symbol== b.symbol
select a.symbol as symbol, a.price as priceA, b.price as priceB
insert all events into OutputStream;


Sample Application
Using event window, the initial problem of having duplicate events in multiple windows can be solved as shown below.
define stream SensorStream (name string, value float, roomNo int, deviceID string);
define stream SensorWindow (name string, value float, roomNo int, deviceID string) timeBatch(1 second);

@info(name = 'query0')
from SensorStream
insert into SensorWindow;

@info(name = 'query0')
from  SensorWindow
select name, max(value) as maxValue, roomNo
group by name, roomNo
insert into MaxSensorReadingPerRoomStream;

@info(name = 'query1')
from  SensorWindow
select name, avg(value) as maxValue, roomNo
group by name, roomNo
insert into AverageSensorReadingPerBuildingStream;
In the above query, there is only one window which keeps all the events arrived to the SensorStream within 1 second interval and which is used by two different queries.


Architecture
An event window is a Snapshotable entity which contains an ordinary window processor inside it. The event window can be used in place of event streams, therefore it can receive events from stream junction and also it can publish events to the stream junction.

WSO2 Siddhi Event Window Architecture

StreamJunction coordinates the events between streams. All the events passed from input streams of event window will be sent to the EventWindow using InsertIntoWindowCallback. The call back changes the state of expired events to current event and send them to the EventWindow. EventWindow searches for state events and convert them to stream events because state events cannot be handled by window processors. The internal window processor is determined based on the type of the event window. The window processor does the actual processing depending on the window type and emits the current events and/or expired events according to the definition. Output of window processor is received by the StreamPublishProcessor and filtered based on the output event type of the event window. Then the filtered events are sent back to the stream junction. Stream junction will forward those events to any other interested queries.
Previous
Next Post »

Contact Form

Name

Email *

Message *