Complex Event Processing - An Introduction

Today almost all the big brothers in the software industry are behind big data and data analytics. Not only the large companies, even small scale companies need data processing in order to track and lead their business. Complex Event Processing(CEP) is one of the techniques being used to analyse streams of events for interested events or patterns. This article explains the big picture of complex event processing in a nutshell using a simple example.


As the name suggests Complex Event Processing is mainly about processing events using some predefined rules. Mainly they are used to derive complex events by aggrgating and processing stream of simple events. For example, identifying that you are reading this blog right now is an easy task for your computer since the browser knows the site which is currently being opened on your computer. However, some events cannot be detected as simple as the above example. For example, how could a supermarket send coupons closed to a teen girl’s delivery date while even her father didn’t know that she was pregnant? (Read the incredible story here: ‘How Target Figured Out A Teen Girl Was Pregnant Before Her Father Did’).

Let’s say our problem is identifying the pregnant customers of a supermarket so that the supermarket can send  coupons for baby clothes and cribs. Assume that according to experts report, if a customer bought cocoa-butter lotion, a purse large enough to double as a diaper bag and ZMA supplements within 7 days, the customer is pregnant (or he/she has a pregnant relation). Using this domain knowledge, we can define the rule to identify the pregnant customers. However, in a supermarket with thousands of customers per day, it is hard to search for such patterns manually using a rule.

Complex Event Processor is a tool to process stream of events automatically as they arrive. Compared to database management systems, database engines store the data and pass the query to process on those data. In contrast, CEPs store the query and pass the data against it and extract the necessary information. There are several CEPs available and this article uses WSO2 CEP which is an open source Java based complex event processor.

WSO2 CEP is recommended to be used as a cluster of nodes for an enterprise level requirement. However, in this article, we use the core of WSO2 CEP known as Siddhi which can be used as a Java library.

Step 1:
Create a new Maven project in Eclipse using the following information.
Group Id: com.javahelps
Artifact Id: siddhidemo

Step 2:
Open the pom.xml and add the following repository. This repository is required to download the latest Siddhi library.
<repositories>
    <repository>
        <id>wso2.releases</id>
        <name>WSO2 internal Repository</name>
        <url>http://maven.wso2.org/nexus/content/repositories/releases/</url>
        <releases>
            <enabled>true</enabled>
            <updatePolicy>daily</updatePolicy>
            <checksumPolicy>ignore</checksumPolicy>
        </releases>
    </repository>
</repositories>

Step 3:
Add the following dependencies. (Java version is optional)
<properties>
    <siddhi.version>3.1.0</siddhi.version>
    <maven.compiler.source>1.7</maven.compiler.source>
    <maven.compiler.target>1.7</maven.compiler.target>
</properties>

<dependencies>
    <!--Siddhi -->
    <dependency>
        <groupId>org.wso2.siddhi</groupId>
        <artifactId>siddhi-query-api</artifactId>
        <version>${siddhi.version}</version>
    </dependency>
    <dependency>
        <groupId>org.wso2.siddhi</groupId>
        <artifactId>siddhi-core</artifactId>
        <version>${siddhi.version}</version>
    </dependency>
    <dependency>
        <groupId>org.wso2.siddhi</groupId>
        <artifactId>siddhi-query-compiler</artifactId>
        <version>${siddhi.version}</version>
    </dependency>
</dependencies>

After modification, the pom.xml must look like this:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.javahelps</groupId>
    <artifactId>siddhidemo</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <repositories>
        <repository>
            <id>wso2.releases</id>
            <name>WSO2 internal Repository</name>
            <url>http://maven.wso2.org/nexus/content/repositories/releases/</url>
            <releases>
                <enabled>true</enabled>
                <updatePolicy>daily</updatePolicy>
                <checksumPolicy>ignore</checksumPolicy>
            </releases>
        </repository>
    </repositories>

    <properties>
        <siddhi.version>3.1.0</siddhi.version>
        <maven.compiler.source>1.7</maven.compiler.source>
        <maven.compiler.target>1.7</maven.compiler.target>
    </properties>

    <dependencies>
        <!--Siddhi -->
        <dependency>
            <groupId>org.wso2.siddhi</groupId>
            <artifactId>siddhi-query-api</artifactId>
            <version>${siddhi.version}</version>
        </dependency>
        <dependency>
            <groupId>org.wso2.siddhi</groupId>
            <artifactId>siddhi-core</artifactId>
            <version>${siddhi.version}</version>
        </dependency>
        <dependency>
            <groupId>org.wso2.siddhi</groupId>
            <artifactId>siddhi-query-compiler</artifactId>
            <version>${siddhi.version}</version>
        </dependency>
    </dependencies>
</project>

Step 4:
Create a new package com.javahelps.siddhidemo in src/main/java folder.

Step 5:
Create a new class named Main.java with the main method.

Step 6:
Create a SiddhiManager instance which is used to create an execution plan.
package com.javahelps.siddhidemo;

import org.wso2.siddhi.core.SiddhiManager;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        SiddhiManager siddhiManager = new SiddhiManager();
    }
}

Step 7:
Define a stream as shown below. Here stream is used to define the properties of the input event.
package com.javahelps.siddhidemo;

import org.wso2.siddhi.core.SiddhiManager;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        SiddhiManager siddhiManager = new SiddhiManager();

        String streams = "define stream purchaseStream (customerName string, item string, timestamp long); ";
    }
}
The above definition says, all the events passed to the purchaseStream have the properties customerName and item which are String and timestamp of purchase in long.

Step 8:
The most important step is creating the query.
Here is my logic:
  1. Get all the streams and partition them based on the customer name because there can be more customers and we need to process their data individually.
  2. From every partitions, filter the purchases of interested items.
  3. Again break the events into subsets where each set contains only purchases happened in 7 days interval. For example, if Alice purchased Cocoa-Butter Lotion on Monday, Biscuit on Saturday and Beer on next Tuesday, the  Cocoa-Butter Lotion and Biscuit purchases must be in one subset and Beer must be in another subset.
  4. From each subset of events, take the count of unique items. Count of unique items can range from 1 to 3 since we have already filtered the stream to allow purchases of three items only.
  5. Those who have purchased all three products are pregnant with 100% confidence. Those who have  bought only 2 items are pregnant with 66.67% confidence. Those who have purchased only one product are pregnant with 33.33% confidence.
The Siddhi query for above logic is given below:
partition with (customerName of purchaseStream)
begin
    from purchaseStream[item == 'Cocoa-Butter Lotion' OR item == 'Purse-XL' OR item == 'ZMA']#window.uniqueExternalTimeBatch(item, timestamp, 500 milliseconds)
    select customerName, convert(count(item), 'double') / 3.0 * 100.0 as noOfPurchases insert into possiblePregnant;
end;

1. The input stream is partitioned by customerName.

2. The partitioned stream is filtered to allow only Cocoa-Butter Lotion, Purse-XL and ZMA.

3. Siddhi offers uniqueExternalTimeBatch which can be used to get unique time framed subset of events. The interval of 500 milliseconds is used instead of 7 days for quick response.

4. Select the customerName and count of items from each subset and insert the possible pregnant customers with the level of confidence into the output stream.

Step 9:
Add the stream definition and the query to the Siddhi manager and create the ExecutionPlanRuntime.
package com.javahelps.siddhidemo;

import org.wso2.siddhi.core.ExecutionPlanRuntime;
import org.wso2.siddhi.core.SiddhiManager;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        SiddhiManager siddhiManager = new SiddhiManager();

        String streams = "define stream purchaseStream (customerName string, item string, timestamp long); ";
        String query = "partition with (customerName of purchaseStream) " +
                        "begin " +
                        "from purchaseStream[item == 'Cocoa-Butter Lotion' OR item == 'Purse-XL' OR item == 'ZMA']#window.uniqueExternalTimeBatch(item, timestamp, 500 milliseconds) " +
                        "select customerName, convert(count(item), 'double') / 3.0 * 100.0 as noOfPurchases insert into possiblePregnant; " +
                        "end ";

        // Create ExecutionPlanRuntime using stream definition and query
        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(streams + query);
    }
}

Step 10:
Create two static arrays one for Customers and the other for Products and a static Random class and add a new method to generate input events.
package com.javahelps.siddhidemo;

import java.util.Random;

import org.wso2.siddhi.core.ExecutionPlanRuntime;
import org.wso2.siddhi.core.SiddhiManager;

public class Main {
    private static final String[] CUSTOMERS = { "Alice", "Barby", "Carol", "Diana" };
    private static final String[] ITEMS = { "Cocoa-Butter Lotion", "Purse-XL", "Purse-L", "Beer", "Biscuit",
            "Chocolate", "ZMA" };
    private static final Random RANDOM = new Random();

    public static void main(String[] args) throws InterruptedException {
        SiddhiManager siddhiManager = new SiddhiManager();

        String streams = "define stream purchaseStream (customerName string, item string, timestamp long); ";
        String query = "partition with (customerName of purchaseStream) " +
                        "begin " +
                        "from purchaseStream[item == 'Cocoa-Butter Lotion' OR item == 'Purse-XL' OR item == 'ZMA']#window.uniqueExternalTimeBatch(item, timestamp, 500 milliseconds) " +
                        "select customerName, convert(count(item), 'double') / 3.0 * 100.0 as noOfPurchases insert into possiblePregnant; " +
                        "end ";

        // Create ExecutionPlanRuntime using stream definition and query
        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(streams + query);
    }

    private static Object[] generateEvent() {
        String name = CUSTOMERS[RANDOM.nextInt(CUSTOMERS.length)];
        String item = ITEMS[RANDOM.nextInt(ITEMS.length)];
        long time = System.currentTimeMillis(); // Current time

        System.out.println(name + " buys " + item);
        Object[] event = new Object[] { name, item, time };
        return event;
    }
}

Step 11:
Add StreamCallback to receive the output events and InputHandler to send events.
package com.javahelps.siddhidemo;

import java.util.Random;

import org.wso2.siddhi.core.ExecutionPlanRuntime;
import org.wso2.siddhi.core.SiddhiManager;
import org.wso2.siddhi.core.event.Event;
import org.wso2.siddhi.core.stream.input.InputHandler;
import org.wso2.siddhi.core.stream.output.StreamCallback;

public class Main {
    private static final String[] CUSTOMERS = { "Alice", "Barby", "Carol", "Diana" };
    private static final String[] ITEMS = { "Cocoa-Butter Lotion", "Purse-XL", "Purse-L", "Beer", "Biscuit",
            "Chocolate", "ZMA" };
    private static final Random RANDOM = new Random();

    public static void main(String[] args) throws InterruptedException {
        SiddhiManager siddhiManager = new SiddhiManager();

        String streams = "define stream purchaseStream (customerName string, item string, timestamp long); ";
        String query = "partition with (customerName of purchaseStream) " +
                        "begin " +
                        "from purchaseStream[item == 'Cocoa-Butter Lotion' OR item == 'Purse-XL' OR item == 'ZMA']#window.uniqueExternalTimeBatch(item, timestamp, 500 milliseconds) " +
                        "select customerName, convert(count(item), 'double') / 3.0 * 100.0 as noOfPurchases insert into possiblePregnant; " +
                        "end ";

        // Create ExecutionPlanRuntime using stream definition and query
        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(streams + query);
       
        try {
            // Receive the output events
            executionPlanRuntime.addCallback("possiblePregnant", new StreamCallback() {
                @Override
                public void receive(Event[] events) {
                    String output = String.format("\t\t\t%s is pregnant with %.2f%% confidence.", events[0].getData(0),
                            events[0].getData(1));
                    System.out.println(output);
                }
            });

            // Send input events
            InputHandler purchaseStream = executionPlanRuntime.getInputHandler("purchaseStream");
            executionPlanRuntime.start();
            for (int i = 0; i < 1000; i++) {
                Object[] event = generateEvent();
                purchaseStream.send(event);
                Thread.sleep(10); // Delay for 10 milliseconds
            }
        } finally {
            executionPlanRuntime.shutdown();
        }
    }

    private static Object[] generateEvent() {
        String name = CUSTOMERS[RANDOM.nextInt(CUSTOMERS.length)];
        String item = ITEMS[RANDOM.nextInt(ITEMS.length)];
        long time = System.currentTimeMillis(); // Current time

        System.out.println(name + " buys " + item);
        Object[] event = new Object[] { name, item, time };
        return event;
    }

}

Step 12:
Save all the changes and run the application.

The above query is a sample implementation for the given scenario. There can be other ways to achieve the same output using CEP. For more details about Siddhi, visit to the following link: Siddhi Documentation. This example is a simple application of CEP to process stream of events, however CEP can be used in complex scenarios like weather prediction, the stock market trends, fraud detection in bank transaction, etc as well. If you have any questions or suggestions, please comment below.

Find the project @ Git Hub.
Previous
Next Post »

Contact Form

Name

Email *

Message *