Never Duplicate A Window Again - WSO2 Siddhi Event Window

A new feature known as Event Window is introduced in WSO2 Siddhi 3.1.1 version, which is quite similar to the named window of Esper CEP in some aspects. This article presents the application and the architecture of Event Window using a simple example. According to Siddhi version 3.1.0, a window can be defined on a stream inside a query and the output can be used in the same query itself. For example, consider a scenario where WSO2 CEP is used to analyze the sensor reading of a smart home which has multiple sensors in number of rooms. All the sensors are sending their reading to an input stream named SensorStream and there are different analytics components interested in various statistic information.

If there are two analytic components named A and B which are interested in the maximum reading of each sensors in every room in last 5 seconds and the average sensor reading of each type of sensors in the smart home in last 5 seconds respectively, it can be achieved using a query provided below.
define stream SensorStream (name string, value float, roomNo int, deviceID string);

@info(name = 'query0')
from SensorStream#window.timeBatch(1 second)
select name, max(value) as maxValue, roomNo
group by name, roomNo
insert into MaxSensorReadingPerRoomStream;

@info(name = 'query1')
from SensorStream#window.timeBatch(1 second)
select name, max(value) as maxValue, roomNo
group by name, roomNo
insert into AverageSensorReadingPerBuildingStream;
In this definition, the events arrived in last second will be stored in two windows which are identical  to each other. The output is determined only by the selection and grouping. If the time interval is large enough, with thousands of sensors used in a smart house, several megabytes of memory will be wasted by the duplicate events. Also the window defined in a query cannot be reused for any other purpose even inside the same query. To overcome these problems, Event Window is introduced as a global window which can be accessed from any queries any number of times.

The Siddhi queries and the architecture discussed in this article are presented according to the Siddhi version 3.1.1.

Read More

Deploy and Upgrade Android Database From External Directory

After seeing the huge response for the Import and Use External Database in Android article, I have realized the importance of deploying Android database from external sources. The library used in the above article allows you to import database only from the assets directory. However, recently one of my readers, asked for a way to import and upgrade the database from SD card. As a solution for his use case, I have developed a new library named "externalsqliteimporter" which allows you to import database either from assets directory or from SD card. This article explains the application of this Android library using a sample application.


The ExternalSQLiteImporter library allows you to build your SQLite database on your desktop computer, and to import and use it in your Android application. This library has two separate ways to maintain your database.

This library is still under development. Deploying and upgrading  the database from an external directory is not secure as it is publicly available for third party applications as well. Use that feature with caution. 

Read More

Complex Event Processing - An Introduction

Today almost all the big brothers in the software industry are behind big data and data analytics. Not only the large companies, even small scale companies need data processing in order to track and lead their business. Complex Event Processing(CEP) is one of the techniques being used to analyse streams of events for interested events or patterns. This article explains the big picture of complex event processing in a nutshell using a simple example.


As the name suggests Complex Event Processing is mainly about processing events using some predefined rules. Mainly they are used to derive complex events by aggrgating and processing stream of simple events. For example, identifying that you are reading this blog right now is an easy task for your computer since the browser knows the site which is currently being opened on your computer. However, some events cannot be detected as simple as the above example. For example, how could a supermarket send coupons closed to a teen girl’s delivery date while even her father didn’t know that she was pregnant? (Read the incredible story here: ‘How Target Figured Out A Teen Girl Was Pregnant Before Her Father Did’).

Let’s say our problem is identifying the pregnant customers of a supermarket so that the supermarket can send  coupons for baby clothes and cribs. Assume that according to experts report, if a customer bought cocoa-butter lotion, a purse large enough to double as a diaper bag and ZMA supplements within 7 days, the customer is pregnant (or he/she has a pregnant relation). Using this domain knowledge, we can define the rule to identify the pregnant customers. However, in a supermarket with thousands of customers per day, it is hard to search for such patterns manually using a rule.

Complex Event Processor is a tool to process stream of events automatically as they arrive. Compared to database management systems, database engines store the data and pass the query to process on those data. In contrast, CEPs store the query and pass the data against it and extract the necessary information. There are several CEPs available and this article uses WSO2 CEP which is an open source Java based complex event processor.

WSO2 CEP is recommended to be used as a cluster of nodes for an enterprise level requirement. However, in this article, we use the core of WSO2 CEP known as Siddhi which can be used as a Java library.

Step 1:
Create a new Maven project in Eclipse using the following information.
Group Id: com.javahelps
Artifact Id: siddhidemo

Step 2:
Open the pom.xml and add the following repository. This repository is required to download the latest Siddhi library.
<repositories>
    <repository>
        <id>wso2.releases</id>
        <name>WSO2 internal Repository</name>
        <url>http://maven.wso2.org/nexus/content/repositories/releases/</url>
        <releases>
            <enabled>true</enabled>
            <updatePolicy>daily</updatePolicy>
            <checksumPolicy>ignore</checksumPolicy>
        </releases>
    </repository>
</repositories>

Step 3:
Add the following dependencies. (Java version is optional)
<properties>
    <siddhi.version>3.1.0</siddhi.version>
    <maven.compiler.source>1.7</maven.compiler.source>
    <maven.compiler.target>1.7</maven.compiler.target>
</properties>

<dependencies>
    <!--Siddhi -->
    <dependency>
        <groupId>org.wso2.siddhi</groupId>
        <artifactId>siddhi-query-api</artifactId>
        <version>${siddhi.version}</version>
    </dependency>
    <dependency>
        <groupId>org.wso2.siddhi</groupId>
        <artifactId>siddhi-core</artifactId>
        <version>${siddhi.version}</version>
    </dependency>
    <dependency>
        <groupId>org.wso2.siddhi</groupId>
        <artifactId>siddhi-query-compiler</artifactId>
        <version>${siddhi.version}</version>
    </dependency>
</dependencies>

After modification, the pom.xml must look like this:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.javahelps</groupId>
    <artifactId>siddhidemo</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <repositories>
        <repository>
            <id>wso2.releases</id>
            <name>WSO2 internal Repository</name>
            <url>http://maven.wso2.org/nexus/content/repositories/releases/</url>
            <releases>
                <enabled>true</enabled>
                <updatePolicy>daily</updatePolicy>
                <checksumPolicy>ignore</checksumPolicy>
            </releases>
        </repository>
    </repositories>

    <properties>
        <siddhi.version>3.1.0</siddhi.version>
        <maven.compiler.source>1.7</maven.compiler.source>
        <maven.compiler.target>1.7</maven.compiler.target>
    </properties>

    <dependencies>
        <!--Siddhi -->
        <dependency>
            <groupId>org.wso2.siddhi</groupId>
            <artifactId>siddhi-query-api</artifactId>
            <version>${siddhi.version}</version>
        </dependency>
        <dependency>
            <groupId>org.wso2.siddhi</groupId>
            <artifactId>siddhi-core</artifactId>
            <version>${siddhi.version}</version>
        </dependency>
        <dependency>
            <groupId>org.wso2.siddhi</groupId>
            <artifactId>siddhi-query-compiler</artifactId>
            <version>${siddhi.version}</version>
        </dependency>
    </dependencies>
</project>

Step 4:
Create a new package com.javahelps.siddhidemo in src/main/java folder.

Step 5:
Create a new class named Main.java with the main method.

Step 6:
Create a SiddhiManager instance which is used to create an execution plan.
package com.javahelps.siddhidemo;

import org.wso2.siddhi.core.SiddhiManager;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        SiddhiManager siddhiManager = new SiddhiManager();
    }
}

Step 7:
Define a stream as shown below. Here stream is used to define the properties of the input event.
package com.javahelps.siddhidemo;

import org.wso2.siddhi.core.SiddhiManager;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        SiddhiManager siddhiManager = new SiddhiManager();

        String streams = "define stream purchaseStream (customerName string, item string, timestamp long); ";
    }
}
The above definition says, all the events passed to the purchaseStream have the properties customerName and item which are String and timestamp of purchase in long.

Step 8:
The most important step is creating the query.
Here is my logic:
  1. Get all the streams and partition them based on the customer name because there can be more customers and we need to process their data individually.
  2. From every partitions, filter the purchases of interested items.
  3. Again break the events into subsets where each set contains only purchases happened in 7 days interval. For example, if Alice purchased Cocoa-Butter Lotion on Monday, Biscuit on Saturday and Beer on next Tuesday, the  Cocoa-Butter Lotion and Biscuit purchases must be in one subset and Beer must be in another subset.
  4. From each subset of events, take the count of unique items. Count of unique items can range from 1 to 3 since we have already filtered the stream to allow purchases of three items only.
  5. Those who have purchased all three products are pregnant with 100% confidence. Those who have  bought only 2 items are pregnant with 66.67% confidence. Those who have purchased only one product are pregnant with 33.33% confidence.
The Siddhi query for above logic is given below:
partition with (customerName of purchaseStream)
begin
    from purchaseStream[item == 'Cocoa-Butter Lotion' OR item == 'Purse-XL' OR item == 'ZMA']#window.uniqueExternalTimeBatch(item, timestamp, 500 milliseconds)
    select customerName, convert(count(item), 'double') / 3.0 * 100.0 as noOfPurchases insert into possiblePregnant;
end;

1. The input stream is partitioned by customerName.

2. The partitioned stream is filtered to allow only Cocoa-Butter Lotion, Purse-XL and ZMA.

3. Siddhi offers uniqueExternalTimeBatch which can be used to get unique time framed subset of events. The interval of 500 milliseconds is used instead of 7 days for quick response.

4. Select the customerName and count of items from each subset and insert the possible pregnant customers with the level of confidence into the output stream.

Step 9:
Add the stream definition and the query to the Siddhi manager and create the ExecutionPlanRuntime.
package com.javahelps.siddhidemo;

import org.wso2.siddhi.core.ExecutionPlanRuntime;
import org.wso2.siddhi.core.SiddhiManager;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        SiddhiManager siddhiManager = new SiddhiManager();

        String streams = "define stream purchaseStream (customerName string, item string, timestamp long); ";
        String query = "partition with (customerName of purchaseStream) " +
                        "begin " +
                        "from purchaseStream[item == 'Cocoa-Butter Lotion' OR item == 'Purse-XL' OR item == 'ZMA']#window.uniqueExternalTimeBatch(item, timestamp, 500 milliseconds) " +
                        "select customerName, convert(count(item), 'double') / 3.0 * 100.0 as noOfPurchases insert into possiblePregnant; " +
                        "end ";

        // Create ExecutionPlanRuntime using stream definition and query
        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(streams + query);
    }
}

Step 10:
Create two static arrays one for Customers and the other for Products and a static Random class and add a new method to generate input events.
package com.javahelps.siddhidemo;

import java.util.Random;

import org.wso2.siddhi.core.ExecutionPlanRuntime;
import org.wso2.siddhi.core.SiddhiManager;

public class Main {
    private static final String[] CUSTOMERS = { "Alice", "Barby", "Carol", "Diana" };
    private static final String[] ITEMS = { "Cocoa-Butter Lotion", "Purse-XL", "Purse-L", "Beer", "Biscuit",
            "Chocolate", "ZMA" };
    private static final Random RANDOM = new Random();

    public static void main(String[] args) throws InterruptedException {
        SiddhiManager siddhiManager = new SiddhiManager();

        String streams = "define stream purchaseStream (customerName string, item string, timestamp long); ";
        String query = "partition with (customerName of purchaseStream) " +
                        "begin " +
                        "from purchaseStream[item == 'Cocoa-Butter Lotion' OR item == 'Purse-XL' OR item == 'ZMA']#window.uniqueExternalTimeBatch(item, timestamp, 500 milliseconds) " +
                        "select customerName, convert(count(item), 'double') / 3.0 * 100.0 as noOfPurchases insert into possiblePregnant; " +
                        "end ";

        // Create ExecutionPlanRuntime using stream definition and query
        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(streams + query);
    }

    private static Object[] generateEvent() {
        String name = CUSTOMERS[RANDOM.nextInt(CUSTOMERS.length)];
        String item = ITEMS[RANDOM.nextInt(ITEMS.length)];
        long time = System.currentTimeMillis(); // Current time

        System.out.println(name + " buys " + item);
        Object[] event = new Object[] { name, item, time };
        return event;
    }
}

Step 11:
Add StreamCallback to receive the output events and InputHandler to send events.
package com.javahelps.siddhidemo;

import java.util.Random;

import org.wso2.siddhi.core.ExecutionPlanRuntime;
import org.wso2.siddhi.core.SiddhiManager;
import org.wso2.siddhi.core.event.Event;
import org.wso2.siddhi.core.stream.input.InputHandler;
import org.wso2.siddhi.core.stream.output.StreamCallback;

public class Main {
    private static final String[] CUSTOMERS = { "Alice", "Barby", "Carol", "Diana" };
    private static final String[] ITEMS = { "Cocoa-Butter Lotion", "Purse-XL", "Purse-L", "Beer", "Biscuit",
            "Chocolate", "ZMA" };
    private static final Random RANDOM = new Random();

    public static void main(String[] args) throws InterruptedException {
        SiddhiManager siddhiManager = new SiddhiManager();

        String streams = "define stream purchaseStream (customerName string, item string, timestamp long); ";
        String query = "partition with (customerName of purchaseStream) " +
                        "begin " +
                        "from purchaseStream[item == 'Cocoa-Butter Lotion' OR item == 'Purse-XL' OR item == 'ZMA']#window.uniqueExternalTimeBatch(item, timestamp, 500 milliseconds) " +
                        "select customerName, convert(count(item), 'double') / 3.0 * 100.0 as noOfPurchases insert into possiblePregnant; " +
                        "end ";

        // Create ExecutionPlanRuntime using stream definition and query
        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(streams + query);
       
        try {
            // Receive the output events
            executionPlanRuntime.addCallback("possiblePregnant", new StreamCallback() {
                @Override
                public void receive(Event[] events) {
                    String output = String.format("\t\t\t%s is pregnant with %.2f%% confidence.", events[0].getData(0),
                            events[0].getData(1));
                    System.out.println(output);
                }
            });

            // Send input events
            InputHandler purchaseStream = executionPlanRuntime.getInputHandler("purchaseStream");
            executionPlanRuntime.start();
            for (int i = 0; i < 1000; i++) {
                Object[] event = generateEvent();
                purchaseStream.send(event);
                Thread.sleep(10); // Delay for 10 milliseconds
            }
        } finally {
            executionPlanRuntime.shutdown();
        }
    }

    private static Object[] generateEvent() {
        String name = CUSTOMERS[RANDOM.nextInt(CUSTOMERS.length)];
        String item = ITEMS[RANDOM.nextInt(ITEMS.length)];
        long time = System.currentTimeMillis(); // Current time

        System.out.println(name + " buys " + item);
        Object[] event = new Object[] { name, item, time };
        return event;
    }

}

Step 12:
Save all the changes and run the application.

The above query is a sample implementation for the given scenario. There can be other ways to achieve the same output using CEP. For more details about Siddhi, visit to the following link: Siddhi Documentation. This example is a simple application of CEP to process stream of events, however CEP can be used in complex scenarios like weather prediction, the stock market trends, fraud detection in bank transaction, etc as well. If you have any questions or suggestions, please comment below.

Find the project @ Git Hub.
Read More

Contact Form

Name

Email *

Message *