WSO2 CEP - Publish Events Using Java Client

The last article: WSO2 CEP - Hello World!, explained how to set up WSO2 CEP with a simple event processor. This article shows you the way to send events to the CEP using a Java client. Actually it is nothing more than an HTTP client which can send the event to the CEP through HTTP request.

Step 0:
Follow the previous article and setup the CEP engine. This article uses the same event processor and receiver defined in the previous article to test the Java client.

Step 1:
Create a new simple Maven project in Eclipse.

Step 2:
Give the Group Id: "com.javahelps" and Artifact Id: "cepclient" and click on the "Finish" button.
Step 3:
Add the following repositories to Maven's pom.xml.
<repositories>
    <repository>
        <id>wso2-nexus</id>
        <name>WSO2 internal Repository</name>
        <url>http://maven.wso2.org/nexus/content/groups/wso2-public/</url>
        <releases>
            <enabled>true</enabled>
            <updatePolicy>daily</updatePolicy>
            <checksumPolicy>ignore</checksumPolicy>
        </releases>
    </repository>
    <repository>
        <id>wso2.releases</id>
        <name>WSO2 internal Repository</name>
        <url>http://maven.wso2.org/nexus/content/repositories/releases/</url>
        <releases>
            <enabled>true</enabled>
            <updatePolicy>daily</updatePolicy>
            <checksumPolicy>ignore</checksumPolicy>
        </releases>
    </repository>
    <repository>
        <id>wso2.snapshots</id>
        <name>Apache Snapshot Repository</name>
        <url>http://maven.wso2.org/nexus/content/repositories/snapshots/</url>
        <snapshots>
            <enabled>true</enabled>
            <updatePolicy>daily</updatePolicy>
        </snapshots>
        <releases>
            <enabled>false</enabled>
        </releases>
    </repository>
</repositories>


Step 4:
Add the following dependencies to the pom.xml.
<dependencies>
    <dependency>
        <groupId>org.apache.ws.commons.axiom.wso2</groupId>
        <artifactId>axiom</artifactId>
        <version>1.2.11.wso2v6</version>
    </dependency>
    <dependency>
        <groupId>org.wso2.orbit.org.apache.httpcomponents</groupId>
        <artifactId>httpclient</artifactId>
        <version>4.3.1.wso2v2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.httpcomponents.wso2</groupId>
        <artifactId>httpcore</artifactId>
        <version>4.3.3.wso2v1</version>
    </dependency>
    <dependency>
        <groupId>com.google.code.gson</groupId>
        <artifactId>gson</artifactId>
        <version>2.1</version>
    </dependency>
    <dependency>
        <groupId>commons-logging</groupId>
        <artifactId>commons-logging</artifactId>
        <version>1.2</version>
    </dependency>
</dependencies>

After adding both repositories and dependencies, the pom.xml should look like this:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.javahelps</groupId>
    <artifactId>cepclient</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <repositories>
        <repository>
            <id>wso2-nexus</id>
            <name>WSO2 internal Repository</name>
            <url>http://maven.wso2.org/nexus/content/groups/wso2-public/</url>
            <releases>
                <enabled>true</enabled>
                <updatePolicy>daily</updatePolicy>
                <checksumPolicy>ignore</checksumPolicy>
            </releases>
        </repository>
        <repository>
            <id>wso2.releases</id>
            <name>WSO2 internal Repository</name>
            <url>http://maven.wso2.org/nexus/content/repositories/releases/</url>
            <releases>
                <enabled>true</enabled>
                <updatePolicy>daily</updatePolicy>
                <checksumPolicy>ignore</checksumPolicy>
            </releases>
        </repository>
        <repository>
            <id>wso2.snapshots</id>
            <name>Apache Snapshot Repository</name>
            <url>http://maven.wso2.org/nexus/content/repositories/snapshots/</url>
            <snapshots>
                <enabled>true</enabled>
                <updatePolicy>daily</updatePolicy>
            </snapshots>
            <releases>
                <enabled>false</enabled>
            </releases>
        </repository>
    </repositories>

    <dependencies>
        <dependency>
            <groupId>org.apache.ws.commons.axiom.wso2</groupId>
            <artifactId>axiom</artifactId>
            <version>1.2.11.wso2v6</version>
        </dependency>
        <dependency>
            <groupId>org.wso2.orbit.org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.3.1.wso2v2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents.wso2</groupId>
            <artifactId>httpcore</artifactId>
            <version>4.3.3.wso2v1</version>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.1</version>
        </dependency>
        <dependency>
            <groupId>commons-logging</groupId>
            <artifactId>commons-logging</artifactId>
            <version>1.2</version>
        </dependency>
    </dependencies>
</project>

Step 5:
Create a new package "com.javahelps.cepclient" in "src/main/java" source folder.

Step 6:
Create a new class Main.java with main method inside that package.

Step 7:
Add the TemperatureReceiver URL as a static variable to the Main class. This is the same URL used in Step 16 of the WSO2 CEP - Hello World! article.
package com.javahelps.cepclient;

public class Main {
    /**
     * URL of the event receiver.
     */
    private static final String URL = "http://localhost:9763/endpoints/TemperatureReceiver";

    public static void main(String[] args) {
   
    }
}

Step 8:
Create an HttpClient and HttpMethod as shown below.
package com.javahelps.cepclient;

import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.HttpClientBuilder;

public class Main {
    /**
     * URL of the event receiver.
     */
    private static final String URL = "http://localhost:9763/endpoints/TemperatureReceiver";

    public static void main(String[] args) {
        // Create an HTTP client.
        HttpClient httpClient = HttpClientBuilder.create().build();

        // Create a POST method using the receiver URL.
        HttpPost method = new HttpPost(URL);
    }
}

Step 9:
The TemperatureReceiver requires the event as a JSON data in the following format:
{
    "event": {
        "payloadData": {
            "temp": 15.6
        }
    }
}
Create a JsonObject which is matching with the required input format as shown below.
package com.javahelps.cepclient;

import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.HttpClientBuilder;

import com.google.gson.JsonObject;

public class Main {
    /**
     * URL of the event receiver.
     */
    private static final String URL = "http://localhost:9763/endpoints/TemperatureReceiver";

    public static void main(String[] args) {
        // Create an HTTP client.
        HttpClient httpClient = HttpClientBuilder.create().build();

        // Create a POST method using the receiver URL.
        HttpPost method = new HttpPost(URL);

        // Create a JsonObject for the event.
        JsonObject event = new JsonObject();

        // Create another JsonObject for pay-load data.
        JsonObject payLoadData = new JsonObject();

        // Set the temperature.
        payLoadData.addProperty("temp", 42.6f);

        event.add("payloadData", payLoadData);

        // Convert the JsonObject to String.
        String eventString = "{\"event\": " + event + "}";
    }
}

Step 10:
Finally create a StringEntity and send the request to the CEP.
package com.javahelps.cepclient;

import java.io.IOException;
import java.io.UnsupportedEncodingException;

import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;

import com.google.gson.JsonObject;

public class Main {
    /**
     * URL of the event receiver.
     */
    private static final String URL = "http://localhost:9763/endpoints/TemperatureReceiver";

    public static void main(String[] args) {
        // Create an HTTP client.
        HttpClient httpClient = HttpClientBuilder.create().build();

        // Create a POST method using the receiver URL.
        HttpPost method = new HttpPost(URL);

        // Create a JsonObject for the event.
        JsonObject event = new JsonObject();

        // Create another JsonObject for pay-load data.
        JsonObject payLoadData = new JsonObject();

        // Set the temperature.
        payLoadData.addProperty("temp", 42.6f);

        event.add("payloadData", payLoadData);

        // Convert the JsonObject to String.
        String eventString = "{\"event\": " + event + "}";

        try {
            // Create an entity and add it to the method.
            StringEntity entity = new StringEntity(eventString);
            method.setEntity(entity);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }

        try {
            // Execute the method and retrieve the response.
            HttpResponse response = httpClient.execute(method);

            // Get the entity out of the response.
            HttpEntity httpEntity = response.getEntity();

            int status = response.getStatusLine().getStatusCode();

            // Check the status code for successful completion.
            if (status == 200) {
                System.out.println("Published");
            } else {
                System.out.println("Failed to publish");
            }

            // Close the connection and release the resources.
            try {
                EntityUtils.consume(httpEntity);
            } catch (IOException e) {
                e.printStackTrace();
            }
        } catch (ClientProtocolException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

Step 11:
Now save all the changes and run the application. (Make sure that the CEP is up and running)

Step 12:
Run the application few numbers of times with different temperature values. After 1 minute, CEP will print the peak temperature in the terminal used to execute the CEP. (According to the publisher created in the previous article, the CEP will print the peak temperature to its own console)

Find the project @ Git Hub.

Reference: Publishing JSON/REST Data using Java Client
Previous
Next Post »

2 comments

Write comments
cancobanoglu
AUTHOR
August 7, 2017 at 5:37 PM delete

Hi, i have a question about how to post Siddhi SQL for a specific event stream to wso2 cep server from and application, i mean i would like to post SQL as JSON to endpoint of wso2 cep server.

Thanks in advance

Reply
avatar
Gobinath
AUTHOR
August 11, 2017 at 11:12 PM delete

Hi,
To deploy Siddhi execution plan using an endpoint, WSO2 provides admin services. Do a Google search on WSO2 DAS Admin services.

Reply
avatar

Contact Form

Name

Email *

Message *