In the series of WSO2 CEP tutorials, this article explains how to create Apache Thrift publisher and receiver for a CEP server in Python. Even though this is javahelps.com, I use Python since publisher and receiver in Java are already available in WSO2 CEP samples.
One of the major advantages of Apache Thrift is the support for various platforms. Therefore this tutorial can be simply adapted to any other languages with less effort.
WSO2 CEP supports various protocols as a server to send and receive events. However using TCP protocol is recommended when the performance is a critical requirement. To achieve the security without performance trade-off, an SSL connection is used to authenticate the connection and the actuals events are be sent to the interested parties through unencrypted TCP connections.
Requirements:
- Apache Thrift
- Python 2
- Java Development Kit 1.7 or Latest
- WSO2 Complex Event Processor 4.2.0
Setting up the environment
Step 1:
If you don't have Apache Thrift compiler, install it using the following command. Windows and Mac users, please find the binary files
here.
sudo apt install thrift-compiler
Install the Python thrift library using the following command. Windows and Mac users must install this library using pip or manually include the library into the project.
sudo apt install python-thrift
Step 2:
Download and extract the WSO2 CEP pack. Start the server using the following command.
sh <CEP_HOME>/bin/wso2server.sh
Step 3:
Create a new stream using the following properties.
Event Stream Name | com.javahelps.stream.Temperature |
Payload data attributes | |
Step 4:
Create a publisher for testing purposes using the following configurations:
Event Publisher Name | com.javahelps.publisher.logger.Temperature |
Event Source | com.javahelps.stream.Temperature |
Output Event Adapter Type | logger |
Message Format | text |
Now we have the required platform and let's jump into coding.
Step 1:
Create a new workspace directory wherever you like. Create a child directory
wso2-thrift inside it and download the following files into the wso2-thrift directory from WSO2 GitHub
repository.
- Data.thrift
- Exception.thrift
- ThriftEventTransmissionService.thrift
- ThriftSecureEventTransmissionService.thrift
Step 2:
Open the terminal inside the parent directory and execute the following commands to generate Python source files.
thrift -r --gen py wso2-thrift/Data.thrift
thrift -r --gen py wso2-thrift/Exception.thrift
thrift -r --gen py wso2-thrift/ThriftEventTransmissionService.thrift
thrift -r --gen py wso2-thrift/ThriftSecureEventTransmissionService.thrift
After executing these commands, you will get a
gen-py directory which contains the generated source files.
Step 3:
The thrift receiver will run as a server and which requires an SSL certificate. Create the certificate using the following command. You need to answer the questions asked by openssl during this process.
openssl req -x509 -nodes -days 365 -newkey rsa:2048 -keyout server.key -out server.crt
For example:
Country Name (2 letter code) [AU] | CA |
State or Province Name (full name) [Some-State] | Ontario |
Locality Name (eg, city) [] | London |
Organization Name (eg, company) [Internet Widgits Pty Ltd] | Western University |
Organizational Unit Name (eg, section) []: | Faculty Of Engineering |
Common Name (e.g. server FQDN or YOUR name) [] | Gobinath |
Email Address [] | admin@javahelps.com |
Step 5:
Once you have created the server.crt and server.key files, combine them into server.pem file using the following command.
cat server.crt server.key > server.pem
Step 6:
To register our SSL server as a reliable client to the CEP, enter the following command and restart the CEP.
keytool -importcert -file server.crt -keystore <CEP_HOME>/repository/resources/security/client-truststore.jks -alias "PythonThriftClient"
In the above command, the alias PythonThriftClient can be anything and the <CEP_HOME> must be replaced by the actual home directory of WSO2 CEP.
When prompted, enter the keystore password 'wso2carbon' and confirm to trust the certificate.
Step 7:
Create a new file
ServerHandler.py with the following code:
#!/usr/bin/env python
class ServerHandler(object):
"""
ServerHandler contains the functions to serve the requests from WSO2 CEP.
"""
def __init__(self):
pass
"""
Receive the username and password, verify it and return a unique session id.
"""
def connect(self, uname, password):
print 'Connect ' + uname + ':' + password
return '123456' # A random session id
"""
Destroy the session.
"""
def disconnect(self, sessionId):
print 'Disconnect the session ' + str(sessionId)
def defineStream(self, sessionId, streamDefinition):
return ''
def findStreamId(self, sessionId, streamName, streamVersion):
return ''
"""
Receive the event and process it.
"""
def publish(self, eventBundle):
print 'Received a new event: ' + str(eventBundle) + '\n'
def deleteStreamById(self, sessionId, streamId):
pass
def deleteStreamByNameVersion(self, sessionId, streamName, streamVersion):
pass
Step 8:
Create a new file
TCPServer.py with the following code:
#!/usr/bin/env python
import sys
sys.path.append('gen-py')
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from thrift.server import TServer
from ServerHandler import ServerHandler
from ThriftEventTransmissionService import ThriftEventTransmissionService
SERVER_ADDRESS = 'localhost'
SERVER_PORT = 8888
handler = ServerHandler()
processor = ThriftEventTransmissionService.Processor(handler)
transport = TSocket.TServerSocket(SERVER_ADDRESS, SERVER_PORT)
tfactory = TTransport.TBufferedTransportFactory()
pfactory = TBinaryProtocol.TBinaryProtocolFactory()
server = TServer.TSimpleServer(processor, transport, tfactory, pfactory)
print 'Starting TCP Server at ' + SERVER_ADDRESS + ':' + str(SERVER_PORT)
server.serve()
Note the server port is 8888
Step 9:
Create a new file
SSLServer.py with the following code:
#!/usr/bin/env python
import sys
import os
sys.path.append('gen-py')
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from thrift.server import TServer
from thrift.transport import TSSLSocket
from ServerHandler import ServerHandler
from ThriftSecureEventTransmissionService import ThriftSecureEventTransmissionService
SERVER_ADDRESS = 'localhost'
SERVER_PORT = 8988
CERTIFICATE_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'server.pem')
handler = ServerHandler()
processor = ThriftSecureEventTransmissionService.Processor(handler)
transport = TSSLSocket.TSSLServerSocket(SERVER_ADDRESS, SERVER_PORT, certfile=CERTIFICATE_FILE)
tfactory = TTransport.TBufferedTransportFactory()
pfactory = TBinaryProtocol.TBinaryProtocolFactory()
server = TServer.TSimpleServer(processor, transport, tfactory, pfactory)
print 'Starting SSL Server at ' + SERVER_ADDRESS + ':' + str(SERVER_PORT)
server.serve()
Note the server port is 8988
Step 10:
Create a new file
Publisher.py with the following code:
#!/usr/bin/env python
import sys
import time
sys.path.append('gen-py')
from ThriftSecureEventTransmissionService import ThriftSecureEventTransmissionService
from ThriftEventTransmissionService import ThriftEventTransmissionService
from ThriftEventTransmissionService.ttypes import *
from thrift import Thrift
from thrift.transport import TSSLSocket
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
class Publisher:
"""
Create SSL and TCP sockets along with buffer and binary protocol.
"""
def __init__(self, ip, ssl_port, tcp_port):
# Make SSL socket
self.ssl_socket = TSSLSocket.TSSLSocket(ip, ssl_port, False)
self.ssl_transport = TTransport.TBufferedTransport(self.ssl_socket)
self.ssl_protocol = TBinaryProtocol.TBinaryProtocol(self.ssl_transport)
# Make TCP socket
self.tcp_socket = TSocket.TSocket(ip, tcp_port)
self.tcp_transport = TTransport.TBufferedTransport(self.tcp_socket)
self.tcp_protocol = TBinaryProtocol.TBinaryProtocol(self.tcp_transport)
def connect(self, username, password):
# Create a client to use the protocol encoder
self.ssl_client = ThriftSecureEventTransmissionService.Client(self.ssl_protocol)
self.tcp_client = ThriftEventTransmissionService.Client(self.tcp_protocol)
# Make connection
self.ssl_socket.open()
# self.transport.open()
self.sessionId = self.ssl_client.connect(username, password)
self.tcp_socket.open()
def defineStream(self, streamDef):
# Create Stream Definition
return self.tcp_client.defineStream(self.sessionId, streamDef)
def publish(self, streamId, *attributes):
# Build thrift event bundle
event = EventBundle()
event.setSessionId(self.sessionId)
event.setEventNum(1)
event.addLongAttribute(time.time() * 1000)
event.addStringAttribute(streamId)
for attr in attributes:
if isinstance(attr, int):
event.addIntAttribute(attr)
elif isinstance(attr, basestring):
event.addStringAttribute(attr)
elif isinstance(attr, long):
event.addLongAttribute(attr)
elif isinstance(attr, float):
event.addDoubleAttribute(attr)
elif isinstance(attr, bool):
event.addBoolAttribute(attr)
else:
event.setArbitraryDataMapMap(attr)
# Publish
self.tcp_client.publish(event.getEventBundle())
def disconnect(self):
# Disconnect
self.ssl_client.disconnect(self.sessionId)
self.ssl_transport.close()
self.ssl_socket.close()
self.tcp_transport.close()
self.tcp_socket.close()
class EventBundle:
__sessionId = ""
__eventNum = 0
__intAttributeList = []
__longAttributeList = []
__doubleAttributeList = []
__boolAttributeList = []
__stringAttributeList = []
__arbitraryDataMapMap = None
def setSessionId(self, sessionId):
self.__sessionId = sessionId
def setEventNum(self, num):
self.__eventNum = num
def addIntAttribute(self, attr):
self.__intAttributeList.append(attr)
def addLongAttribute(self, attr):
self.__longAttributeList.append(attr)
def addDoubleAttribute(self, attr):
self.__doubleAttributeList.append(attr)
def addBoolAttribute(self, attr):
self.__boolAttributeList.append(attr)
def addStringAttribute(self, attr):
self.__stringAttributeList.append(attr)
def setArbitraryDataMapMap(self, attr):
self.__arbitraryDataMapMap = attr
def getEventBundle(self):
return Data.ttypes.ThriftEventBundle(self.__sessionId, self.__eventNum, self.__intAttributeList, self.__longAttributeList, self.__doubleAttributeList, self.__boolAttributeList, self.__stringAttributeList, self.__arbitraryDataMapMap)
Step 11:
Create a new file
PublisherClient.py with the following code:
#!/usr/bin/env python
from Publisher import Publisher
CEP_SERVER_ADDRESS = '192.168.122.1' # IP address of the server. You can find it at the end of the CEP console
SSL_PORT = 7711 # Thrift SSL port of the server
TCP_PORT = 7611 # Thrift TCP port of the server
USERNAME = 'admin' # Username
PASSWORD = 'admin' # Passowrd
publisher = Publisher(CEP_SERVER_ADDRESS, SSL_PORT, TCP_PORT)
# Connect to server with username and password
publisher.connect(USERNAME, PASSWORD)
# Publish an event to the Temperature stream
publisher.publish('com.javahelps.stream.Temperature:1.0.0', 'Kitchen', 56)
# Disconnect
publisher.disconnect()
Change the CEP_SERVER_ADDRESS according to your system. The SSL_PORT and TCP_PORT of the CEP must be as given default values. If they differ, we will update them in Step 15.
See the complete directory structure below:
.
├── gen-py
│ ├── Data
│ │ ├── constants.py
│ │ ├── __init__.py
│ │ ├── __init__.pyc
│ │ ├── ttypes.py
│ │ └── ttypes.pyc
│ ├── Exception
│ │ ├── constants.py
│ │ ├── __init__.py
│ │ ├── __init__.pyc
│ │ ├── ttypes.py
│ │ └── ttypes.pyc
│ ├── __init__.py
│ ├── ThriftEventTransmissionService
│ │ ├── constants.py
│ │ ├── __init__.py
│ │ ├── __init__.pyc
│ │ ├── ThriftEventTransmissionService.py
│ │ ├── ThriftEventTransmissionService.pyc
│ │ ├── ThriftEventTransmissionService-remote
│ │ ├── ttypes.py
│ │ └── ttypes.pyc
│ └── ThriftSecureEventTransmissionService
│ ├── constants.py
│ ├── __init__.py
│ ├── __init__.pyc
│ ├── ThriftSecureEventTransmissionService.py
│ ├── ThriftSecureEventTransmissionService.pyc
│ ├── ThriftSecureEventTransmissionService-remote
│ ├── ttypes.py
│ └── ttypes.pyc
├── PublisherClient.py
├── Publisher.py
├── server.crt
├── ServerHandler.py
├── server.key
├── server.pem
├── SSLServer.py
├── TCPServer.py
└── wso2-thrift
├── Data.thrift
├── Exception.thrift
├── ThriftEventTransmissionService.thrift
└── ThriftSecureEventTransmissionService.thrift
Step 12:
Go to the CEP dashboard and create a new WSO2 event receiver. As shown below.
Event Receiver Name | com.javahelps.receiver.Temperature |
Input Event Adapter Type | wso2event |
Event Stream | com.javahelps.stream.Temperature:1.0.0 |
Message Format | wso2event |
Compare the ports available for Thrift protocol with the SSL_PORT and TCP_PORT in PublisherClient.py . If there are any differences, update the SSL_PORT and TCP_PORT values.
Step 13:
Add another publisher to publish the events to our Python servers. Use the following configuration.
Event Publisher Name | com.javahelps.publisher.thrift.Temperature |
Event Source | com.javahelps.stream.Temperature |
Output Event Adapter Type | wso2event |
Receiver URL | tcp://localhost:8888 |
Authenticator URL | ssl://localhost:8988 |
Username | admin (Can be anything. Our authenticator accepts any username) |
Password | admin (Can be anything. Our authenticator accepts any password) |
Protocol | thrift |
Publishing Mode | non-blocking |
Publishing Timeout | 0 |
Message Format | wso2event |
Step 14:
Start the TCPServer.py and SSLSErver.py using the following commands.
python TCPServer.py
python SSLSErver.py
Step 15:
Send an event to the WSO2 CEP server from PublisherClient using the following command and see the results in terminal.
python PublisherClient.py
Now we have successfully created a Thrift event publisher and receiver to send and receive events to and from WSO2 Complex Event Processor. To implement such a client in a different language, you need to change the Python related steps only. The Python clients along with the CEP artifacts are available at the following GitHub repository.
Find the project @ GitHub.