Detect Absence of Events - WSO2 Siddhi Pattern

Detect Absence of Events - WSO2 Siddhi Pattern

WSO2 Siddhi an opensource complex event processing engine which is used to power the WSO2 Analytics Server received a new feature from GSoC project: Non-Occurrence of Events for Siddhi Patterns. Until Siddhi 3.x, patterns can detect only the events that have arrived but building patterns based on events that have not arrived is an essential requirement and even there were questions on StackOverflow regarding this problem:
This article introduces the new pattern to detect absence of events along with its limitations and sample use cases. In order to get a clear idea, let's begin with a sample use case.
This feature is available from Siddhi v4.0.0-M50. Those who prefer stable version have to wait until Siddhi v4.0.0.
Read More
Why should I have super type reference & sub class object?

Why should I have super type reference & sub class object?

Just now I've got an email from one of my student with the following question: "Why should we create Animal obj = new Dog(); instead of Dog obj = new Dog();" Of course the example given here is made by myself but the question in detail is why all use super interface or super class reference instead of using the same class reference. You got the question right? This article answers the question.

The answer is: "It is a best practice being followed by our ancestors. Stop asking questions and code that way :-)" 

Ok, more seriously, consider the following example:

There is a SuperFancyClass created by developer A for some super fancy purposes. Note that all the references are HashMap.
import java.util.HashMap;

public class SuperFancyClass {
    
    private HashMap<String, Object> mapOne;
    private HashMap<Integer, Object> mapTwo;

    public SuperFancyClass() {
        this.mapOne = new HashMap<>();
        this.mapTwo = new HashMap<>();
    }

    public HashMap<String, Object> doStuff1() {
        // Do something
        return this.mapOne;
    }

    public HashMap<String, Object> doStuff2() {
        // Do something
        return this.mapOne;
    }

    public HashMap<Integer, Object> doStuff3() {
        // Do something
        return this.mapTwo;
    }

    // Some other highly complex code here
}
That class is being used by developer B as given below.
import java.util.HashMap;

public class SensitiveAgent {

    public static void main(String[] args) {
        
        SuperFancyClass superFancy = new SuperFancyClass();
        HashMap<String, Object> map1 = superFancy.doStuff1();
        HashMap<String, Object> map2 = superFancy.doStuff2();
        HashMap<Integer, Object> map3 = superFancy.doStuff3();
    }

    // Some other highly complex code here
}
Assuming both classes are highly complex but both developers A and B are happy with what they have, there is nothing else to worry.

As time progress, now developer A wants to make his Maps to maintain the insertion order. The Java API says, HashMap does not respect the insertion order so if you need insertion order, you need to switch to LinkedHashMap.

So now developer A wants to find and replace all of his/her HashMaps by LinkedHashMap. But this will not be such a short happy ending story in a highly complex code base where there can be several other local HashMaps which cannot be replaced by LinkedHashMap. So in reality, developer A has to go and change everywhere it requires to LinkedHashMap as provided below.
import java.util.LinkedHashMap;

public class SuperFancyClass {
    
    private LinkedHashMap<String, Object> mapOne;
    private LinkedHashMap<Integer, Object> mapTwo;

    public SuperFancyClass() {
        this.mapOne = new LinkedHashMap<>();
        this.mapTwo = new LinkedHashMap<>();
    }

    public LinkedHashMap<String, Object> doStuff1() {
        // Do something
        return this.mapOne;
    }

    public LinkedHashMap<String, Object> doStuff2() {
        // Do something
        return this.mapOne;
    }

    public LinkedHashMap<Integer, Object> doStuff3() {
        // Do something
        return this.mapTwo;
    }

    // Some other highly complex code here
}
But now the code of developer B collapses because of the changes done in SuperFancyClass. Then developer B has to go and change the references from HashMap to LinkedHashMap as shown below.
import java.util.LinkedHashMap;

public class SensitiveAgent {

    public static void main(String[] args) {
        
        SuperFancyClass superFancy = new SuperFancyClass();
        LinkedHashMap<String, Object> map1 = superFancy.doStuff1();
        LinkedHashMap<String, Object> map2 = superFancy.doStuff2();
        LinkedHashMap<Integer, Object> map3 = superFancy.doStuff3();
    }

    // Some other highly complex code here
}

Rule of debugging:

Now, this is the time to introduce an important rule of debugging:
Fixing a bug is equivalent to making several other bugs.
In other words, if you modify an existing code, the number of changes you have made will be proportional to the number of newly expected bugs because we are all human and we do mistakes. Without our intention, we might delete a local variable that hides an instance variable. Or else we might modify that single line which causes to bring the entire world to end. So always keep your changes as less as possible.

Coming back to the previous scenario, developer A modifies the SuperFancyClass in 7 places and developer B has to modify the SensitiveAgent in 3 places. Even in a code-base which contains two dummy classes, there are 10 places altogether to modify. Just imagine a project with hundreds/thousands of classes and millions of lines. Yep they do exists and it will be a catastrophic.

Time to travel back...
Suppose if the developer A designed his/her class as given below using super interface references (However the objects must be created using subclass. See the constructor):
import java.util.Map;
import java.util.HashMap;

public class SuperFancyClass {
    
    private Map<String, Object> mapOne;
    private Map<Integer, Object> mapTwo;

    public SuperFancyClass() {
        this.mapOne = new HashMap<>();
        this.mapTwo = new HashMap<>();
    }

    public Map<String, Object> doStuff1() {
        // Do something
        return this.mapOne;
    }

    public Map<String, Object> doStuff2() {
        // Do something
        return this.mapOne;
    }

    public Map<Integer, Object> doStuff3() {
        // Do something
        return this.mapTwo;
    }

    // Some other highly complex code here
}
The developer B would developed his/her code like this because the return types of those methods are Map; the super interface:
import java.util.Map;

public class SensitiveAgent {

    public static void main(String[] args) {
        
        SuperFancyClass superFancy = new SuperFancyClass();
        Map<String, Object> map1 = superFancy.doStuff1();
        Map<String, Object> map2 = superFancy.doStuff2();
        Map<Integer, Object> map3 = superFancy.doStuff3();
    }

    // Some other highly complex code here
}
Now if the same situation comes where the developer A wants to preserve insertion order in his/her maps, only lines he/she has to change are just those two lines inside the constructor as shown below:
import java.util.Map;
import java.util.LinkedHashMap;

public class SuperFancyClass {
    
    private Map<String, Object> mapOne;
    private Map<Integer, Object> mapTwo;

    public SuperFancyClass() {
        this.mapOne = new LinkedHashMap<>();
        this.mapTwo = new LinkedHashMap<>();
    }

    public Map<String, Object> doStuff1() {
        // Do something
        return this.mapOne;
    }

    public Map<String, Object> doStuff2() {
        // Do something
        return this.mapOne;
    }

    public Map<Integer, Object> doStuff3() {
        // Do something
        return this.mapTwo;
    }

    // Some other highly complex code here
}
Developer B does not need to change anything because there is nothing changed int the API level (return types are not changed by developer A). So now the same behavior is achieved with less number of modifications (This time only 2). This is what we call extensible, modifiable, (all those blah-blah-blah-able) code.

Then why not Object instead of Map?
Now you may have a question: Should we always use the super most interface/class as the reference? Not always. It depends on the requirements. Climbing towards super types means, we are limiting the features because 99% of the time, sub classes have more features than super types. For example, compare java.lang.Number with its sub classes or compare java.util.Collection with its sub interfaces/classes. So use the super type reference which has all the functionalities you are expected to have.

Does the rule applied to local variables?
Then what is the advantage of using super reference in local variables which have no impact on others? I guess this is because we developers are used to that and there is nothing wrong in following the same practice in local variables. For example, consider this case:
import java.util.List;
import java.util.ArrayList;

public class HelloWorld {
    public static void main(String[] args) {
        List<String> list = new ArrayList<>();
        list.add("Java");
        list.add("Python");
        list.add("C++");
        System.out.println(list);
    }
}
In this code there is no advantage of using super interface reference but I do write this way because I am used to it. If there is no disadvantages, why do you bother about it? Keep calm and code your references in super type :-)
Read More

Microservices Framework for Java (MSF4J) - HelloWorld!

In a recent article: Microservices in a minute, I have introduced a lightweight microservice framework: WSO2 MSF4J. That tutorial shows you how to create a microservice in minutes using the Maven archetype. However, the libraries available in the public Maven repositories are bit older and there are new releases after MSF4J 2.0.1 which are available in WSO2's Maven repository. This article shows you how to create a microservice using the latest stable release of MS4J Framework.

Requirements:
  • Java Development Kit 1.8
  • Apache Maven 3.x
  • Postman or CURL
  • Eclipse IDE for Java EE Developers / Any IDEs with Maven support
Read More

Install Oracle JDK in Ubuntu

Oracle Java is the proprietary, reference implementation for Java. This is no longer currently available in a supported Ubuntu repository. This article shows you the way to manually install the latest Oracle Java Development Kit (Oracle JDK) in Ubuntu.


Note: This article uses JDK8_Update_$java_update_no to demonstrate the installation. In the provided commands, replace the version specific paths and file names to your downloaded version.

Step 1:
Download the latest JDK(jdk-Xuxx-linux-xXX.tar.gz) from this official link.
If you want to download to a remote server or if you simply prefer wget, use the command given in this Stackoverflow answer: Downloading JDK
Read More

Parse PCAP files in Java

This article is for those who have spent hours like me to find a good library that can parse raw PCAP files in Java. There are plenty of open source libraries already available for Java but most of them are acting as a wrapper to the libpcap library which makes them hard to use for simple use cases. The library I came across: pkts is a pure Java library which can be easily imported into your project as a Maven dependency. This article introduces the pkts library using a HelloWorld application to parse a PCAP output.

Requirements:
  • Java IDE with Maven support (Eclipse IDE for Java EE is used here)
  • The PCAP file you want to parse
Read More

Microservices in a minute

Microservices get more light in recent years as a new service oriented architecture with a high level of modularity. Compared to traditional web services either they are SOAP or REST, microservices are small in size and complexity but brings the cohesiveness to web services. Microservices do not require a servlet container or web server to be deployed instead they are created as individual JAR files which you can run as any other JAR files. The idea behind microservices is creating tiny individual services which can be developed, deployed and modified with less effort. This article introduces microservices using WSO2 Microservices Framework in a minute.

Microservices in a minute
Image Credits: http://wso2.com/blogs/thesource/2016/05/enabling-microservice-architecture-with-middleware/

WSO2 Microservices Framework for Java (WSO2 MSF4J) is a lightweight annotation based model to develop microservices in Java. Developers can create a microservice using just two or three classes which of course require a framework to take care of the householding tasks. The MSF4J hides all the complexities of creating a web service and provide a decent API to develop microservices in minutes. Let's dirt your hands with microservices:

Read More

GitHub Pull Request - A Complete Guide For Beginners

The power of open source applications highly depends on the contributions from the community. As a maintainer of two small open source applications, I have seen the power of such contributions. However, there are people who like to contribute but fail because of the technical barriers. This tutorial shows you a top to bottom step by step guidance on how to add a new feature to an open source application that is available on GitHub.

Safe Eyes is one of my open source application available for Linux users to prevent asthenopia (eye strain) due to RSI. This tutorial shows you how to contribute in updating the French translation of this application by sending a pull request.

Prerequisite:

  • GitHub account
  • Git (Install Git)
  • Atom text editor (You can use any text editors but I prefer Atom due to its support for GitHub)
Step 1: Login to your GitHub account and then visit the GitHub repository of your interested open source application. In this tutorial, I use https://github.com/slgobinath/SafeEyes.


Read More

WSO2 DAS - Hello World!

WSO2 Data Analytics Server is a smart analytics platform for both real-time and batch analytics. The real-time analytics is provided through their powerful open source Complex Event Processing Engine engine Siddhi. This article focuses on the complex event processing capability of the DAS server and provides a quick start guide on how to setup an event stream and process events generated by an HTTP source.

WSO2 DAS - Hello World!

Prerequisite:
Oracle JDK 1.7 or latest
Memory – Minimum 2GB
Disk – Minimum 1 GB
For more details: Installation Prerequisites

Read More

Is WSO2 CEP Dead? No! Here’s Why…

During the WSO2 Con US 2017, a major business decision is announced. Due to some business decisions, WSO2 promotes the Data Analytic Server (DAS) (They may change this name very soon) over the complex event processor. For those who haven’t heard about DAS even though it has been there for a long period, it is another product of WSO2 which contains the Complex Event Processor for real-time analytics and some other components for batch processing and predictive analytics.

WSO2 Smart Analytics


Then what is the future of WSO2 CEP?
Still, WSO2 CEP is available on their site but users are recommended to migrate to DAS. Since all the WSO2 products are developed using the same framework called Carbon and also due to their similar aesthetic design across all the platforms, the transition from CEP to DAS should be easy sledding even for the beginners. Let me repeat, DAS is a creamed version of CEP which provides all the features of CEP as it was in CEP and some additional features for batch processing. This short post is to announce that the existing articles about WSO2 CEP are not going to be outdated or useless due to this transition. You can simply download and extract the DAS pack and test all the features I’ve discussed so far using DAS.

Is WSO2 CEP Dead? No! Here’s Why…


If you still have confusion, just go and download the DAS pack from the official site. Extract the pack and start the server using the following command:
./bin/wso2server.sh
Go to the admin dashboard and you will see all the features of WSO2 CEP except the color of the top banner ;-)
Therefore, my upcoming articles about complex event processing will use WSO2 DAS not CEP.
Read More

Apache Thrift Client for WSO2 CEP

In the series of WSO2 CEP tutorials, this article explains how to create Apache Thrift publisher and receiver for a CEP server in Python. Even though this is javahelps.com, I use Python since publisher and receiver in Java are already available in WSO2 CEP samples. 
One of the major advantages of Apache Thrift is the support for various platforms. Therefore this tutorial can be simply adapted to any other languages with less effort.

WSO2 CEP supports various protocols as a server to send and receive events. However using TCP protocol is recommended when the performance is a critical requirement. To achieve the security without performance trade-off, an SSL connection is used to authenticate the connection and the actuals events are be sent to the interested parties through unencrypted TCP connections.

Apache Thrift Client for WSO2 CEP

Requirements:
  • Apache Thrift
  • Python 2
  • Java Development Kit 1.7 or Latest
  • WSO2 Complex Event Processor 4.2.0

Setting up the environment
Step 1:
If you don't have Apache Thrift compiler, install it using the following command. Windows and Mac users, please find the binary files here.
sudo apt install thrift-compiler

Install the Python thrift library using the following command. Windows and Mac users must install this library using pip or manually include the library into the project.
sudo apt install python-thrift

Step 2:
Download and extract the WSO2 CEP pack. Start the server using the following command.
sh <CEP_HOME>/bin/wso2server.sh

Step 3:
Create a new stream using the following properties.
Event Stream Name com.javahelps.stream.Temperature
Payload data attributes
  • location STRING
  • temp INT

Step 4:
Create a publisher for testing purposes using the following configurations:
Event Publisher Name com.javahelps.publisher.logger.Temperature
Event Source com.javahelps.stream.Temperature
Output Event Adapter Type logger
Message Format text
Apache Thrift Client for WSO2 CEP
Now we have the required platform and let's jump into coding.

Step 1:
Create a new workspace directory wherever you like. Create a child directory wso2-thrift inside it and download the following files into the wso2-thrift directory from WSO2 GitHub repository.
  • Data.thrift
  • Exception.thrift
  • ThriftEventTransmissionService.thrift
  • ThriftSecureEventTransmissionService.thrift
Step 2:
Open the terminal inside the parent directory and execute the following commands to generate Python source files.
thrift -r --gen py wso2-thrift/Data.thrift
thrift -r --gen py wso2-thrift/Exception.thrift
thrift -r --gen py wso2-thrift/ThriftEventTransmissionService.thrift
thrift -r --gen py wso2-thrift/ThriftSecureEventTransmissionService.thrift
After executing these commands, you will get a gen-py directory which contains the generated source files.

Step 3:
The thrift receiver will run as a server and which requires an SSL certificate. Create the certificate using the following command. You need to answer the questions asked by openssl during this process.
openssl req -x509 -nodes -days 365 -newkey rsa:2048 -keyout server.key -out server.crt

For example:
Country Name (2 letter code) [AU] CA
State or Province Name (full name) [Some-State] Ontario
Locality Name (eg, city) [] London
Organization Name (eg, company) [Internet Widgits Pty Ltd] Western University
Organizational Unit Name (eg, section) []: Faculty Of Engineering
Common Name (e.g. server FQDN or YOUR name) [] Gobinath
Email Address [] admin@javahelps.com

Step 5:
Once you have created the server.crt and server.key files, combine them into server.pem file using the following command.
cat server.crt server.key > server.pem

Step 6:
To register our SSL server as a reliable client to the CEP, enter the following command and restart the CEP.
keytool -importcert -file server.crt -keystore <CEP_HOME>/repository/resources/security/client-truststore.jks -alias "PythonThriftClient"

In the above command, the alias  PythonThriftClient can be anything and the <CEP_HOME> must be replaced by the actual home directory of WSO2 CEP.

When prompted, enter the keystore password 'wso2carbon' and confirm to trust the certificate.

Step 7:
Create a new file ServerHandler.py with the following code:
#!/usr/bin/env python


class ServerHandler(object):
    """
        ServerHandler contains the functions to serve the requests from WSO2 CEP.
    """
    def __init__(self):
        pass

    """
        Receive the username and password, verify it and return a unique session id.
    """
    def connect(self, uname, password):
        print 'Connect ' + uname + ':' + password
        return '123456'  # A random session id

    """
        Destroy the session.
    """
    def disconnect(self, sessionId):
        print 'Disconnect the session ' + str(sessionId)

    def defineStream(self, sessionId, streamDefinition):
        return ''

    def findStreamId(self, sessionId, streamName, streamVersion):
        return ''

    """
        Receive the event and process it.
    """
    def publish(self, eventBundle):
        print 'Received a new event: ' + str(eventBundle) + '\n'

    def deleteStreamById(self, sessionId, streamId):
        pass

    def deleteStreamByNameVersion(self, sessionId, streamName, streamVersion):
        pass

Step 8:
Create a new file TCPServer.py with the following code:
#!/usr/bin/env python

import sys
sys.path.append('gen-py')
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from thrift.server import TServer
from ServerHandler import ServerHandler
from ThriftEventTransmissionService import ThriftEventTransmissionService

SERVER_ADDRESS = 'localhost'
SERVER_PORT = 8888

handler = ServerHandler()
processor = ThriftEventTransmissionService.Processor(handler)
transport = TSocket.TServerSocket(SERVER_ADDRESS, SERVER_PORT)
tfactory = TTransport.TBufferedTransportFactory()
pfactory = TBinaryProtocol.TBinaryProtocolFactory()

server = TServer.TSimpleServer(processor, transport, tfactory, pfactory)

print 'Starting TCP Server at ' + SERVER_ADDRESS + ':' + str(SERVER_PORT)

server.serve()

Note the server port is 8888

Step 9:
Create a new file SSLServer.py with the following code:
#!/usr/bin/env python

import sys
import os
sys.path.append('gen-py')
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from thrift.server import TServer
from thrift.transport import TSSLSocket
from ServerHandler import ServerHandler
from ThriftSecureEventTransmissionService import ThriftSecureEventTransmissionService


SERVER_ADDRESS = 'localhost'
SERVER_PORT = 8988
CERTIFICATE_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'server.pem')

handler = ServerHandler()
processor = ThriftSecureEventTransmissionService.Processor(handler)
transport = TSSLSocket.TSSLServerSocket(SERVER_ADDRESS, SERVER_PORT, certfile=CERTIFICATE_FILE)
tfactory = TTransport.TBufferedTransportFactory()
pfactory = TBinaryProtocol.TBinaryProtocolFactory()

server = TServer.TSimpleServer(processor, transport, tfactory, pfactory)

print 'Starting SSL Server at ' + SERVER_ADDRESS + ':' + str(SERVER_PORT)

server.serve()

Note the server port is 8988

Step 10:
Create a new file Publisher.py with the following code:
#!/usr/bin/env python

import sys
import time
sys.path.append('gen-py')

from ThriftSecureEventTransmissionService import ThriftSecureEventTransmissionService
from ThriftEventTransmissionService import ThriftEventTransmissionService
from ThriftEventTransmissionService.ttypes import *

from thrift import Thrift
from thrift.transport import TSSLSocket
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol


class Publisher:

    """
        Create SSL and TCP sockets along with buffer and binary protocol.
    """
    def __init__(self, ip, ssl_port, tcp_port):
        # Make SSL socket
        self.ssl_socket = TSSLSocket.TSSLSocket(ip, ssl_port, False)
        self.ssl_transport = TTransport.TBufferedTransport(self.ssl_socket)
        self.ssl_protocol = TBinaryProtocol.TBinaryProtocol(self.ssl_transport)

        # Make TCP socket
        self.tcp_socket = TSocket.TSocket(ip, tcp_port)
        self.tcp_transport = TTransport.TBufferedTransport(self.tcp_socket)
        self.tcp_protocol = TBinaryProtocol.TBinaryProtocol(self.tcp_transport)

    def connect(self, username, password):
        # Create a client to use the protocol encoder
        self.ssl_client = ThriftSecureEventTransmissionService.Client(self.ssl_protocol)
        self.tcp_client = ThriftEventTransmissionService.Client(self.tcp_protocol)

        # Make connection
        self.ssl_socket.open()
        # self.transport.open()
        self.sessionId = self.ssl_client.connect(username, password)

        self.tcp_socket.open()

    def defineStream(self, streamDef):
        # Create Stream Definition
        return self.tcp_client.defineStream(self.sessionId, streamDef)

    def publish(self, streamId, *attributes):
        # Build thrift event bundle
        event = EventBundle()
        event.setSessionId(self.sessionId)
        event.setEventNum(1)
        event.addLongAttribute(time.time() * 1000)
        event.addStringAttribute(streamId)
        for attr in attributes:
            if isinstance(attr, int):
                event.addIntAttribute(attr)
            elif isinstance(attr, basestring):
                event.addStringAttribute(attr)
            elif isinstance(attr, long):
                event.addLongAttribute(attr)
            elif isinstance(attr, float):
                event.addDoubleAttribute(attr)
            elif isinstance(attr, bool):
                event.addBoolAttribute(attr)
            else:
                event.setArbitraryDataMapMap(attr)

        # Publish
        self.tcp_client.publish(event.getEventBundle())

    def disconnect(self):
        # Disconnect
        self.ssl_client.disconnect(self.sessionId)
        self.ssl_transport.close()
        self.ssl_socket.close()

        self.tcp_transport.close()
        self.tcp_socket.close()


class EventBundle:
    __sessionId = ""
    __eventNum = 0
    __intAttributeList = []
    __longAttributeList = []
    __doubleAttributeList = []
    __boolAttributeList = []
    __stringAttributeList = []
    __arbitraryDataMapMap = None

    def setSessionId(self, sessionId):
        self.__sessionId = sessionId

    def setEventNum(self, num):
        self.__eventNum = num

    def addIntAttribute(self, attr):
        self.__intAttributeList.append(attr)

    def addLongAttribute(self, attr):
        self.__longAttributeList.append(attr)

    def addDoubleAttribute(self, attr):
        self.__doubleAttributeList.append(attr)

    def addBoolAttribute(self, attr):
        self.__boolAttributeList.append(attr)

    def addStringAttribute(self, attr):
        self.__stringAttributeList.append(attr)

    def setArbitraryDataMapMap(self, attr):
        self.__arbitraryDataMapMap = attr

    def getEventBundle(self):
        return Data.ttypes.ThriftEventBundle(self.__sessionId, self.__eventNum, self.__intAttributeList, self.__longAttributeList, self.__doubleAttributeList, self.__boolAttributeList, self.__stringAttributeList, self.__arbitraryDataMapMap)

Step 11:
Create a new file PublisherClient.py with the following code:
#!/usr/bin/env python

from Publisher import Publisher

CEP_SERVER_ADDRESS = '192.168.122.1' # IP address of the server. You can find it at the end of the CEP console
SSL_PORT = 7711  # Thrift SSL port of the server
TCP_PORT = 7611     # Thrift TCP port of the server
USERNAME = 'admin' # Username
PASSWORD = 'admin'  # Passowrd

publisher = Publisher(CEP_SERVER_ADDRESS, SSL_PORT, TCP_PORT)

# Connect to server with username and password
publisher.connect(USERNAME, PASSWORD)

# Publish an event to the Temperature stream
publisher.publish('com.javahelps.stream.Temperature:1.0.0', 'Kitchen', 56)

# Disconnect
publisher.disconnect()
Change the CEP_SERVER_ADDRESS according to your system. The SSL_PORT and TCP_PORT of the CEP must be as given default values. If they differ, we will update them in Step 15.

See the complete directory structure below:
.
├── gen-py
│   ├── Data
│   │   ├── constants.py
│   │   ├── __init__.py
│   │   ├── __init__.pyc
│   │   ├── ttypes.py
│   │   └── ttypes.pyc
│   ├── Exception
│   │   ├── constants.py
│   │   ├── __init__.py
│   │   ├── __init__.pyc
│   │   ├── ttypes.py
│   │   └── ttypes.pyc
│   ├── __init__.py
│   ├── ThriftEventTransmissionService
│   │   ├── constants.py
│   │   ├── __init__.py
│   │   ├── __init__.pyc
│   │   ├── ThriftEventTransmissionService.py
│   │   ├── ThriftEventTransmissionService.pyc
│   │   ├── ThriftEventTransmissionService-remote
│   │   ├── ttypes.py
│   │   └── ttypes.pyc
│   └── ThriftSecureEventTransmissionService
│       ├── constants.py
│       ├── __init__.py
│       ├── __init__.pyc
│       ├── ThriftSecureEventTransmissionService.py
│       ├── ThriftSecureEventTransmissionService.pyc
│       ├── ThriftSecureEventTransmissionService-remote
│       ├── ttypes.py
│       └── ttypes.pyc
├── PublisherClient.py
├── Publisher.py
├── server.crt
├── ServerHandler.py
├── server.key
├── server.pem
├── SSLServer.py
├── TCPServer.py
└── wso2-thrift
    ├── Data.thrift
    ├── Exception.thrift
    ├── ThriftEventTransmissionService.thrift
    └── ThriftSecureEventTransmissionService.thrift

Step 12:
Go to the CEP dashboard and create a new WSO2 event receiver. As shown below.
Event Receiver Name com.javahelps.receiver.Temperature
Input Event Adapter Type wso2event
Event Stream com.javahelps.stream.Temperature:1.0.0
Message Format wso2event
Apache Thrift Client for WSO2 CEP

Compare the ports available for Thrift protocol with the SSL_PORT and TCP_PORT in  PublisherClient.py . If there are any differences, update the  SSL_PORT and TCP_PORT values.

Step 13:
Add another publisher to publish the events to our Python servers. Use the following configuration.
Event Publisher Name com.javahelps.publisher.thrift.Temperature
Event Source com.javahelps.stream.Temperature
Output Event Adapter Type wso2event
Receiver URL tcp://localhost:8888
Authenticator URL ssl://localhost:8988
Username admin (Can be anything. Our authenticator accepts any username)
Password admin (Can be anything. Our authenticator accepts any password)
Protocol thrift
Publishing Mode non-blocking
Publishing Timeout 0
Message Format wso2event
Apache Thrift Client for WSO2 CEP

Step 14:
Start the TCPServer.py and SSLSErver.py using the following commands.
python TCPServer.py
python SSLSErver.py

Step 15:
Send an event to the WSO2 CEP server from PublisherClient using the following command and see the results in terminal.
python  PublisherClient.py

Apache Thrift Client for WSO2 CEP

Now we have successfully created a Thrift event publisher and receiver to send and receive events to and from WSO2 Complex Event Processor. To implement such a client in a different language, you need to change the Python related steps only. The Python clients along with the CEP artifacts are available at the following GitHub repository.

Find the project @ GitHub.
Read More
Siddhi 4.0.0 Early Access

Siddhi 4.0.0 Early Access

Siddhi 4.0.0 is being developed using Java 8 with major core level changes and features. One of the fundamental change to note is that some features of Siddhi have been moved as external extensions to Siddhi and WSO2 Complex Event Processor. This tutorial shows you, how to migrate the complex event project developed using Siddhi 3.x to Siddhi 4.x. Take the sample project developed in "Complex Event Processing - An Introduction" article which was developed using Siddhi 3.1.0.

Warning: Still, Siddhi 4.0.0 is not production ready and it is not recommended to use it until officially released by WSO2.

Required Tools:
  • Git
  • Apache Maven
  • Java 1.8
  • Eclipse or IntelliJ IDEA

Since Siddhi 4.0.0 is not officially released yet, we are going to compile the Siddhi 4.0.0-SNAPSHOT version manually.

Step 1:
Clone the Siddhi project using the following command
git clone https://github.com/wso2/siddhi.git

Step 2:
Set the following environment variable to avoid the Maven OutOfMemoryError.
MAVEN_OPTS="-Xms768m -Xmx3072m -XX:MaxPermSize=1200m"

Step 3:
Open the terminal in the siddhi directory and execute the following command to build Siddhi.
mvn clean install

You can reduce the time to build by skipping the tests using the following command
mvn clean install -Dmaven.test.skip=true

The external unique time batch window used in the "Complex Event Processing - An Introduction" is no more available in the Siddhi core. Most of the features are moved to a separate GitHub account known as "wso2-extensions". The external unique time batch window is available under the siddhi-execution-unique. Follow the steps to clone and build the unique extension.

Step 1:
Clone the siddhi-execution-unique project using the following command.
git clone https://github.com/wso2-extensions/siddhi-execution-unique.git

Step 2:
Open the terminal in the siddhi-execution-unique directory and execute the following command to build Siddhi.
mvn clean install

You can reduce the time to build by skipping the tests using the following command
mvn clean install -Dmaven.test.skip=true

Now clone the siddhidemo project which is from the Complex Event Processing - An Introduction" article or create a new project similar to that as suggested in that article.

Step 1:
Create a new Maven project in Eclipse using the following information.
Group Id: com.javahelps
Artifact Id: siddhimigrationdemo

Step 2:
Add the following dependencies to the pom.xml. Compared to Siddhi 3.x project, siddhi-execution-unique is required in addition.

Note: Check the pom file of Siddhi and siddhi-execution-unique projects and use the same versions in the following properties tag.
<properties>
    <siddhi.version>4.0.0-SNAPSHOT</siddhi.version>
    <siddhi.execution.unique.version>3.1.3-SNAPSHOT</siddhi.execution.unique.version>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
</properties>

<dependencies>
    <!--Siddhi -->
    <dependency>
        <groupId>org.wso2.siddhi</groupId>
        <artifactId>siddhi-query-api</artifactId>
        <version>${siddhi.version}</version>
    </dependency>
    <dependency>
        <groupId>org.wso2.siddhi</groupId>
        <artifactId>siddhi-core</artifactId>
        <version>${siddhi.version}</version>
    </dependency>
    <dependency>
        <groupId>org.wso2.siddhi</groupId>
        <artifactId>siddhi-query-compiler</artifactId>
        <version>${siddhi.version}</version>
    </dependency>
    <dependency>
        <groupId>org.wso2.extension.siddhi.execution.unique</groupId>
        <artifactId>siddhi-execution-unique</artifactId>
        <version>${siddhi.execution.unique.version}</version>
    </dependency>
</dependencies>

Step 3:
Create a new package com.javahelps.siddhimigrationdemo.

Step 4:
Create the Main.java as shown below. If you compare this class with the one used in Siddhi 3.x project, the way to use external time batch window is different. In Siddhi 3.x, the external time batch window is referred as #window.uniqueExternalTimeBatch but based on Siddhi 4.0.0 and the unique extension, it must be referred as #window.unique:externalTimeBatch.
package com.javahelps.siddhimigrationdemo;

import java.util.Random;

import org.wso2.siddhi.core.ExecutionPlanRuntime;
import org.wso2.siddhi.core.SiddhiManager;
import org.wso2.siddhi.core.event.Event;
import org.wso2.siddhi.core.stream.input.InputHandler;
import org.wso2.siddhi.core.stream.output.StreamCallback;

public class Main {
    //    SiddhiDe
    private static final String[] CUSTOMERS = {"Alice", "Barby", "Carol", "Diana"};
    private static final String[] ITEMS = {"Cocoa-Butter Lotion", "Purse-XL", "Purse-L", "Beer", "Biscuit",
            "Chocolate", "ZMA"};
    private static final Random RANDOM = new Random();

    public static void main(String[] args) throws InterruptedException {
        SiddhiManager siddhiManager = new SiddhiManager();

        String streams = "define stream purchaseStream (customerName string, item string, timestamp long); ";
        String query = "partition with (customerName of purchaseStream) " +
                "begin " +
                "from purchaseStream[item == 'Cocoa-Butter Lotion' OR item == 'Purse-XL' OR item == 'ZMA']#window" +
                ".unique:externalTimeBatch(item, timestamp, 500 milliseconds) " +
                "select customerName, convert(count(item), 'double') / 3.0 * 100.0 as noOfPurchases insert into " +
                "possiblePregnant; " +
                "end ";

        // Create ExecutionPlanRuntime using stream definition and query
        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(streams + query);

        try {
            // Receive the output events
            executionPlanRuntime.addCallback("possiblePregnant", new StreamCallback() {
                @Override
                public void receive(Event[] events) {
                    String output = String.format("\t\t\t%s is pregnant with %.2f%% confidence.", events[0].getData(0),
                            events[0].getData(1));
                    System.out.println(output);
                }
            });

            // Send input events
            InputHandler purchaseStream = executionPlanRuntime.getInputHandler("purchaseStream");
            executionPlanRuntime.start();
            for (int i = 0; i < 1000; i++) {
                Object[] event = generateEvent();
                purchaseStream.send(event);
                Thread.sleep(10); // Delay for 10 milliseconds
            }
        } finally {
            executionPlanRuntime.shutdown();
        }
    }

    private static Object[] generateEvent() {
        String name = CUSTOMERS[RANDOM.nextInt(CUSTOMERS.length)];
        String item = ITEMS[RANDOM.nextInt(ITEMS.length)];
        long time = System.currentTimeMillis(); // Current time

        System.out.println(name + " buys " + item);
        Object[] event = new Object[]{name, item, time};
        return event;
    }

}

Save all the changes and run the Main.java. You will see the same results as the Siddhi 3.x project.

Find the project @ GitHub.
Read More

JPA - Hello, World! using Hibernate 5

The article JPA - Hello, World! using Hibernate explains how to use Hibernate v4.x. This tutorial introduces using JPA with the latest Hibernate v5.2.3. Same as the previous article, it also targets beginners who are new to Hibernate and JPA.

You need Java 1.8, Eclipse for Java EE developers and MySQL server in your system in-order to try this tutorial.

Step 1:
Create a database “javahelps” and a table “student” in MySQL.
CREATE DATABASE IF NOT EXISTS javahelps;

CREATE  TABLE javahelps.student ( 
student_id INT NOT NULL ,
student_name VARCHAR(45) NOT NULL ,
student_age INT NOT NULL ,
PRIMARY KEY (student_id) );

Read More

WSO2 CEP - Output Mapping Using Registry Resource

Publishing the output is an important requirement of CEP. WSO2 CEP allows to convert an event to TEXT, XML or JSON, which is known as output mapping . This article explains how a registry resource can be used for custom event mapping in WSO2  CEP 4.2.0.


Step 1:
Start the WSO2 CEP and login to the management console.

Step 2:
Navigate to Home → Manage → Events → Streams → Add Event Stream.

Step 3:
Define an event stream as given below and click on ‘Add Event Stream’ button.
Name: sensorstream
Version: 1.0.0
Meta Data Attributes: sensorId, sensorName and language
Correlation Data Attributes: longitude and latitude
Payload Data Attributes: sensorValue


Step 4:
Navigate to Home → Registry → Browse.

Step 5:
Expand the /_system/config tree view and click on the config folder.

Step 6:
Click on the Add Collection link and add a new collection named template.


Step 7:
Same as Step 6, create another sub folder en inside the template folder.

Step 8:
Click on the ‘Add Resource’ button and select ‘Create Text Content’ as the method. This means that we are going to provide a content that is readable for human.
Provide the name as message and leave the Media type as text/plain. Suppose you want to create an XML or JSON resource, change the media type to application/xml or application/json respectively.

Step 9:
Add the content that matches your media type and click the Add button.


The message used in this screenshot is given below:
Welcome
Sensor {{meta_sensorName}} reports {{sensorValue}} from {{correlation_longitude}}:{{correlation_latitude}}

Step 10:
Navigate to the Home → Manage → Event → Publishers and click on the Add Event Publisher button.

Step 11:
Provide samplelogger as the publisher name, sensorstream:1.0.0 as the event source, logger as the output event adapter type and text as the message format. If your registry resource is XML or JSON, change the message format to your media type.

Then expand the Advanced configuration and select ‘Pick from registry’.
Click on the ‘Configuration Registry’ button, browse and select the message resource.


Leave the Cache Timeout with the default value.

Compare your configuration with the following screenshot.


Finally click the Add button.

Step 12:
Now move to the  Home → Tools → Event Simulator and select the sensorstream:1.0.0 as the events stream name.
Fill the form as shown below and click the send button.
sensorId: 10
sensorName: Temperature
language: en
longitude: 79.861256
latitude: 6.927131
sensorValue: 23.0


Once you clicked the Send button, you should get the following output in the terminal of CEP.



Parametrized Registry Path
WSO2 CEP allows you to parameterize the registry path as well. For example, assume that we want to select a registry resource depending on the meta_language. For this purpose, we need another registry resource.

As you did in Step 4 – 9, cerate a new folder named fr in /_system/config/template and a resource named message in /_system/config/template/fr folder.


The message used in this screenshot is given below:
Bienvenue
Sensor {{meta_sensorName}} reports {{sensorValue}} from {{correlation_longitude}}:{{correlation_latitude}}

Now the registry should contain both en/message and fr/message.


Step 13:
Delete the existing samplelogger publisher and create a new one using the same name as shown in Step 11. But this time, provide the following path as the registry path.
conf:/template/{{meta_language}}/message
Compare this path with the path in Step 11. Instead of hard coding the language folder name en or fr, the attribute meta_language is used. At the runtime, the meta_language will be replaced by the actual language name and the resource at that path will be used by the publisher.


Step 14:
Repeat the Step 12 with languages en, fr and something other than English and French.
In the terminal, CEP will print the message in English and French according to the input and thrown an exception if the resource for the given language does not exists.


If the registry does not contain a resource at the run time evaluated path, an exception will be thrown.

For more details about output mapping, visit to the official document: Output Mapping Types
Read More

Contact Form

Name

Email *

Message *