Apache Thrift Client for WSO2 CEP

In the series of WSO2 CEP tutorials, this article explains how to create Apache Thrift publisher and receiver for a CEP server in Python. Even though this is javahelps.com, I use Python since publisher and receiver in Java are already available in WSO2 CEP samples. 
One of the major advantages of Apache Thrift is the support for various platforms. Therefore this tutorial can be simply adapted to any other languages with less effort.

WSO2 CEP supports various protocols as a server to send and receive events. However using TCP protocol is recommended when the performance is a critical requirement. To achieve the security without performance trade-off, an SSL connection is used to authenticate the connection and the actuals events are be sent to the interested parties through unencrypted TCP connections.

Apache Thrift Client for WSO2 CEP

Requirements:
  • Apache Thrift
  • Python 2
  • Java Development Kit 1.7 or Latest
  • WSO2 Complex Event Processor 4.2.0

Setting up the environment
Step 1:
If you don't have Apache Thrift compiler, install it using the following command. Windows and Mac users, please find the binary files here.
sudo apt install thrift-compiler

Install the Python thrift library using the following command. Windows and Mac users must install this library using pip or manually include the library into the project.
sudo apt install python-thrift

Step 2:
Download and extract the WSO2 CEP pack. Start the server using the following command.
sh <CEP_HOME>/bin/wso2server.sh

Step 3:
Create a new stream using the following properties.
Event Stream Name com.javahelps.stream.Temperature
Payload data attributes
  • location STRING
  • temp INT

Step 4:
Create a publisher for testing purposes using the following configurations:
Event Publisher Name com.javahelps.publisher.logger.Temperature
Event Source com.javahelps.stream.Temperature
Output Event Adapter Type logger
Message Format text
Apache Thrift Client for WSO2 CEP
Now we have the required platform and let's jump into coding.

Step 1:
Create a new workspace directory wherever you like. Create a child directory wso2-thrift inside it and download the following files into the wso2-thrift directory from WSO2 GitHub repository.
  • Data.thrift
  • Exception.thrift
  • ThriftEventTransmissionService.thrift
  • ThriftSecureEventTransmissionService.thrift
Step 2:
Open the terminal inside the parent directory and execute the following commands to generate Python source files.
thrift -r --gen py wso2-thrift/Data.thrift
thrift -r --gen py wso2-thrift/Exception.thrift
thrift -r --gen py wso2-thrift/ThriftEventTransmissionService.thrift
thrift -r --gen py wso2-thrift/ThriftSecureEventTransmissionService.thrift
After executing these commands, you will get a gen-py directory which contains the generated source files.

Step 3:
The thrift receiver will run as a server and which requires an SSL certificate. Create the certificate using the following command. You need to answer the questions asked by openssl during this process.
openssl req -x509 -nodes -days 365 -newkey rsa:2048 -keyout server.key -out server.crt

For example:
Country Name (2 letter code) [AU] CA
State or Province Name (full name) [Some-State] Ontario
Locality Name (eg, city) [] London
Organization Name (eg, company) [Internet Widgits Pty Ltd] Western University
Organizational Unit Name (eg, section) []: Faculty Of Engineering
Common Name (e.g. server FQDN or YOUR name) [] Gobinath
Email Address [] admin@javahelps.com

Step 5:
Once you have created the server.crt and server.key files, combine them into server.pem file using the following command.
cat server.crt server.key > server.pem

Step 6:
To register our SSL server as a reliable client to the CEP, enter the following command and restart the CEP.
keytool -importcert -file server.crt -keystore <CEP_HOME>/repository/resources/security/client-truststore.jks -alias "PythonThriftClient"

In the above command, the alias  PythonThriftClient can be anything and the <CEP_HOME> must be replaced by the actual home directory of WSO2 CEP.

When prompted, enter the keystore password 'wso2carbon' and confirm to trust the certificate.

Step 7:
Create a new file ServerHandler.py with the following code:
#!/usr/bin/env python


class ServerHandler(object):
    """
        ServerHandler contains the functions to serve the requests from WSO2 CEP.
    """
    def __init__(self):
        pass

    """
        Receive the username and password, verify it and return a unique session id.
    """
    def connect(self, uname, password):
        print 'Connect ' + uname + ':' + password
        return '123456'  # A random session id

    """
        Destroy the session.
    """
    def disconnect(self, sessionId):
        print 'Disconnect the session ' + str(sessionId)

    def defineStream(self, sessionId, streamDefinition):
        return ''

    def findStreamId(self, sessionId, streamName, streamVersion):
        return ''

    """
        Receive the event and process it.
    """
    def publish(self, eventBundle):
        print 'Received a new event: ' + str(eventBundle) + '\n'

    def deleteStreamById(self, sessionId, streamId):
        pass

    def deleteStreamByNameVersion(self, sessionId, streamName, streamVersion):
        pass

Step 8:
Create a new file TCPServer.py with the following code:
#!/usr/bin/env python

import sys
sys.path.append('gen-py')
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from thrift.server import TServer
from ServerHandler import ServerHandler
from ThriftEventTransmissionService import ThriftEventTransmissionService

SERVER_ADDRESS = 'localhost'
SERVER_PORT = 8888

handler = ServerHandler()
processor = ThriftEventTransmissionService.Processor(handler)
transport = TSocket.TServerSocket(SERVER_ADDRESS, SERVER_PORT)
tfactory = TTransport.TBufferedTransportFactory()
pfactory = TBinaryProtocol.TBinaryProtocolFactory()

server = TServer.TSimpleServer(processor, transport, tfactory, pfactory)

print 'Starting TCP Server at ' + SERVER_ADDRESS + ':' + str(SERVER_PORT)

server.serve()

Note the server port is 8888

Step 9:
Create a new file SSLServer.py with the following code:
#!/usr/bin/env python

import sys
import os
sys.path.append('gen-py')
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from thrift.server import TServer
from thrift.transport import TSSLSocket
from ServerHandler import ServerHandler
from ThriftSecureEventTransmissionService import ThriftSecureEventTransmissionService


SERVER_ADDRESS = 'localhost'
SERVER_PORT = 8988
CERTIFICATE_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'server.pem')

handler = ServerHandler()
processor = ThriftSecureEventTransmissionService.Processor(handler)
transport = TSSLSocket.TSSLServerSocket(SERVER_ADDRESS, SERVER_PORT, certfile=CERTIFICATE_FILE)
tfactory = TTransport.TBufferedTransportFactory()
pfactory = TBinaryProtocol.TBinaryProtocolFactory()

server = TServer.TSimpleServer(processor, transport, tfactory, pfactory)

print 'Starting SSL Server at ' + SERVER_ADDRESS + ':' + str(SERVER_PORT)

server.serve()

Note the server port is 8988

Step 10:
Create a new file Publisher.py with the following code:
#!/usr/bin/env python

import sys
import time
sys.path.append('gen-py')

from ThriftSecureEventTransmissionService import ThriftSecureEventTransmissionService
from ThriftEventTransmissionService import ThriftEventTransmissionService
from ThriftEventTransmissionService.ttypes import *

from thrift import Thrift
from thrift.transport import TSSLSocket
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol


class Publisher:

    """
        Create SSL and TCP sockets along with buffer and binary protocol.
    """
    def __init__(self, ip, ssl_port, tcp_port):
        # Make SSL socket
        self.ssl_socket = TSSLSocket.TSSLSocket(ip, ssl_port, False)
        self.ssl_transport = TTransport.TBufferedTransport(self.ssl_socket)
        self.ssl_protocol = TBinaryProtocol.TBinaryProtocol(self.ssl_transport)

        # Make TCP socket
        self.tcp_socket = TSocket.TSocket(ip, tcp_port)
        self.tcp_transport = TTransport.TBufferedTransport(self.tcp_socket)
        self.tcp_protocol = TBinaryProtocol.TBinaryProtocol(self.tcp_transport)

    def connect(self, username, password):
        # Create a client to use the protocol encoder
        self.ssl_client = ThriftSecureEventTransmissionService.Client(self.ssl_protocol)
        self.tcp_client = ThriftEventTransmissionService.Client(self.tcp_protocol)

        # Make connection
        self.ssl_socket.open()
        # self.transport.open()
        self.sessionId = self.ssl_client.connect(username, password)

        self.tcp_socket.open()

    def defineStream(self, streamDef):
        # Create Stream Definition
        return self.tcp_client.defineStream(self.sessionId, streamDef)

    def publish(self, streamId, *attributes):
        # Build thrift event bundle
        event = EventBundle()
        event.setSessionId(self.sessionId)
        event.setEventNum(1)
        event.addLongAttribute(time.time() * 1000)
        event.addStringAttribute(streamId)
        for attr in attributes:
            if isinstance(attr, int):
                event.addIntAttribute(attr)
            elif isinstance(attr, basestring):
                event.addStringAttribute(attr)
            elif isinstance(attr, long):
                event.addLongAttribute(attr)
            elif isinstance(attr, float):
                event.addDoubleAttribute(attr)
            elif isinstance(attr, bool):
                event.addBoolAttribute(attr)
            else:
                event.setArbitraryDataMapMap(attr)

        # Publish
        self.tcp_client.publish(event.getEventBundle())

    def disconnect(self):
        # Disconnect
        self.ssl_client.disconnect(self.sessionId)
        self.ssl_transport.close()
        self.ssl_socket.close()

        self.tcp_transport.close()
        self.tcp_socket.close()


class EventBundle:
    __sessionId = ""
    __eventNum = 0
    __intAttributeList = []
    __longAttributeList = []
    __doubleAttributeList = []
    __boolAttributeList = []
    __stringAttributeList = []
    __arbitraryDataMapMap = None

    def setSessionId(self, sessionId):
        self.__sessionId = sessionId

    def setEventNum(self, num):
        self.__eventNum = num

    def addIntAttribute(self, attr):
        self.__intAttributeList.append(attr)

    def addLongAttribute(self, attr):
        self.__longAttributeList.append(attr)

    def addDoubleAttribute(self, attr):
        self.__doubleAttributeList.append(attr)

    def addBoolAttribute(self, attr):
        self.__boolAttributeList.append(attr)

    def addStringAttribute(self, attr):
        self.__stringAttributeList.append(attr)

    def setArbitraryDataMapMap(self, attr):
        self.__arbitraryDataMapMap = attr

    def getEventBundle(self):
        return Data.ttypes.ThriftEventBundle(self.__sessionId, self.__eventNum, self.__intAttributeList, self.__longAttributeList, self.__doubleAttributeList, self.__boolAttributeList, self.__stringAttributeList, self.__arbitraryDataMapMap)

Step 11:
Create a new file PublisherClient.py with the following code:
#!/usr/bin/env python

from Publisher import Publisher

CEP_SERVER_ADDRESS = '192.168.122.1' # IP address of the server. You can find it at the end of the CEP console
SSL_PORT = 7711  # Thrift SSL port of the server
TCP_PORT = 7611     # Thrift TCP port of the server
USERNAME = 'admin' # Username
PASSWORD = 'admin'  # Passowrd

publisher = Publisher(CEP_SERVER_ADDRESS, SSL_PORT, TCP_PORT)

# Connect to server with username and password
publisher.connect(USERNAME, PASSWORD)

# Publish an event to the Temperature stream
publisher.publish('com.javahelps.stream.Temperature:1.0.0', 'Kitchen', 56)

# Disconnect
publisher.disconnect()
Change the CEP_SERVER_ADDRESS according to your system. The SSL_PORT and TCP_PORT of the CEP must be as given default values. If they differ, we will update them in Step 15.

See the complete directory structure below:
.
├── gen-py
│   ├── Data
│   │   ├── constants.py
│   │   ├── __init__.py
│   │   ├── __init__.pyc
│   │   ├── ttypes.py
│   │   └── ttypes.pyc
│   ├── Exception
│   │   ├── constants.py
│   │   ├── __init__.py
│   │   ├── __init__.pyc
│   │   ├── ttypes.py
│   │   └── ttypes.pyc
│   ├── __init__.py
│   ├── ThriftEventTransmissionService
│   │   ├── constants.py
│   │   ├── __init__.py
│   │   ├── __init__.pyc
│   │   ├── ThriftEventTransmissionService.py
│   │   ├── ThriftEventTransmissionService.pyc
│   │   ├── ThriftEventTransmissionService-remote
│   │   ├── ttypes.py
│   │   └── ttypes.pyc
│   └── ThriftSecureEventTransmissionService
│       ├── constants.py
│       ├── __init__.py
│       ├── __init__.pyc
│       ├── ThriftSecureEventTransmissionService.py
│       ├── ThriftSecureEventTransmissionService.pyc
│       ├── ThriftSecureEventTransmissionService-remote
│       ├── ttypes.py
│       └── ttypes.pyc
├── PublisherClient.py
├── Publisher.py
├── server.crt
├── ServerHandler.py
├── server.key
├── server.pem
├── SSLServer.py
├── TCPServer.py
└── wso2-thrift
    ├── Data.thrift
    ├── Exception.thrift
    ├── ThriftEventTransmissionService.thrift
    └── ThriftSecureEventTransmissionService.thrift

Step 12:
Go to the CEP dashboard and create a new WSO2 event receiver. As shown below.
Event Receiver Name com.javahelps.receiver.Temperature
Input Event Adapter Type wso2event
Event Stream com.javahelps.stream.Temperature:1.0.0
Message Format wso2event
Apache Thrift Client for WSO2 CEP

Compare the ports available for Thrift protocol with the SSL_PORT and TCP_PORT in  PublisherClient.py . If there are any differences, update the  SSL_PORT and TCP_PORT values.

Step 13:
Add another publisher to publish the events to our Python servers. Use the following configuration.
Event Publisher Name com.javahelps.publisher.thrift.Temperature
Event Source com.javahelps.stream.Temperature
Output Event Adapter Type wso2event
Receiver URL tcp://localhost:8888
Authenticator URL ssl://localhost:8988
Username admin (Can be anything. Our authenticator accepts any username)
Password admin (Can be anything. Our authenticator accepts any password)
Protocol thrift
Publishing Mode non-blocking
Publishing Timeout 0
Message Format wso2event
Apache Thrift Client for WSO2 CEP

Step 14:
Start the TCPServer.py and SSLSErver.py using the following commands.
python TCPServer.py
python SSLSErver.py

Step 15:
Send an event to the WSO2 CEP server from PublisherClient using the following command and see the results in terminal.
python  PublisherClient.py

Apache Thrift Client for WSO2 CEP

Now we have successfully created a Thrift event publisher and receiver to send and receive events to and from WSO2 Complex Event Processor. To implement such a client in a different language, you need to change the Python related steps only. The Python clients along with the CEP artifacts are available at the following GitHub repository.

Find the project @ GitHub.
Previous
Next Post »

Contact Form

Name

Email *

Message *