Fork/Join Framework

Java provides a strong multithreading platform to the programmers and time to time it introduces new features. Fork/Join framework is such an enhancement introduced in Java 1.7 to optimize the multicore processors in multithreading environment. You are required to have basic knowledge on Java Threads in order to understand the Fork/Join framework. Since this topic is much related to performance of your application, I have provided the specifications of my computer which is used to run the provided codes. To test the real difference between Fork/Join framework used application and single thread application you need to run the given sample codes in a multicore processor.
  • Processor: Intel(R) Core(TM) i5 CPU 2.53GHz
  • RAM: 6GB of DDR3
  • Operating System: Ubuntu 14.04 x64 bit
  • Java: JDK 1.8 x64 bit
I am using Merge Sort algorithm to explain the Fork/Join framework concepts, which is a Divide and Conquer algorithm. Divide and conquering is a technique of breaking a problem into its sub set of problems until we reach a simplest form of that problem. Then the problem will be solved recursively by solving sub problems first. For example, to sort an array {6, 5, 3, 1, 8, 7, 2, 4} first we divide it into two pieces {6, 5, 3, 1} and {8, 7, 2, 4}. Then divide it further to get four arrays with two elements on each of them. Continue the same process until you get arrays with single elements. Once you get those arrays, start merging by comparing elements from adjacent arrays.
For further details about Merge Sort: Wikipedia.
Fork/Join framework was introduced in Java 1.7 to implement the divide and conquer algorithms easily with the support of multicore processors. First look at this normal implementation of merge sort for an array with 100,000,000 random integers.
public class SingleThreadArraySort {
    // From Java 7 '_' can be used to separate digits.
    public static final int ARRAY_SIZE = 10_000_000;
    public static void main(String[] args) {
        long startTime;
        long endTime;
        int[] array = createArray(ARRAY_SIZE);
        MergeSort mergeSort = new MergeSort(array);
        // Get the current time before sorting
        startTime = System.currentTimeMillis();
        mergeSort.sort();
        // Get the current time after sorting
        endTime = System.currentTimeMillis();
        System.out.println("Time taken: " + 
           (endTime - startTime) + " millis");
    }
    // createArray method here...
}
class MergeSort {
    private int array[];
    public MergeSort(int[] array) {
        this.array = array;
    }
    public void sort() {
        sort(0, array.length - 1);
    }
    /**
     * Sort the array using divide and conquer algorithm.
     */
    private void sort(int left, int right) {
        if (left < right) {
            int mid = (left + right) / 2;
            sort(left, mid);
            sort(mid + 1, right);
            merge(left, mid, right);
        }
    }
    // merge method here...
}
Find the complete source code at Git Hub.

According to the given code, Java will not create any additional threads other than the default main thread. So only one thread is used for sorting the array. If you have only one thread, then you cannot use the advantage of having multicore processor. In my computer it took 22,568 milliseconds to complete the sorting. I have attached the screen shot of System monitor below.


In this screenshot 25 – 50 seconds is the execution period of SinglethreadArraySort program. You can notice that even though all the cores of my processor have been used, at a given time only one core is used for sorting purpose. Reason for moving from one core to another core is, time sharing among threads which will be covered in another article (It is another deep concept and I do not want to complex this article even more).

Now have a look on the following code. The code is explained later in this article. Now notice that there is a ForkJoinPool object in main method which is used to start the execution. MergeSort class is a subclass of RecursiveAction class. The sort(int left, int right) method has been replaced by compute() method and since there are no parameters for compute method, the left and right integer values are stored as instance variables using the constructor. In the compute() method, new MergeSort objects are created to sort the sub arrays. invokeAll method is used to execute and wait for those sub MergeSort tasks. Once they have been completed, merging will happen.
public class ForkJoinArraySort {
  public static void main(String[] args) {
        // Create a pool of threads
        ForkJoinPool pool = new ForkJoinPool();
        int[] array = createArray(ARRAY_SIZE);
        long startTime;
        long endTime;
        MergeSort mergeSort = 
            new MergeSort(array, 0, array.length - 1);
        startTime = System.currentTimeMillis();
        // Start execution and wait for result/return
        pool.invoke(mergeSort);
        endTime = System.currentTimeMillis();
        System.out.println("Time taken: " + 
              (endTime - startTime) + " millis");
    }
    // createArray method here...
}

/**
 * Extends RecursiveAction.
 * Notice that the compute method does not return anything.
 */
class MergeSort extends RecursiveAction {
    private int array[];
    private int left;
    private int right;

    public MergeSort(int[] array, int left, int right) {
        this.array = array;
        this.left = left;
        this.right = right;
    }

    /**
     * Inherited from RecursiveAction.
     * Compare it with the run method of a Thread.
     */
    @Override
    protected void compute() {
        if (left < right) {
            int mid = (left + right) / 2;
            RecursiveAction leftSort = 
                new MergeSort(array, left, mid);
            RecursiveAction rightSort = 
                new MergeSort(array, mid + 1, right);
            invokeAll(leftSort, rightSort);
            merge(left, mid, right);
        }
    }
    // merge method here...
}
Find the complete source code at Git Hub.


This code took 13,224 milliseconds and the screenshot of System monitor is attached here. Notice that within the execution period (25 – 40 seconds) all the CPUs are in 100% performance. That means the multicore processor is used 100% efficiently.

Java Fork/Join framework creates a thread pool in background and use those threads to execute the assigned tasks. By default the number of threads created in the pool is equal to number of cores you have in your processor. If a thread is waiting for some other tasks to be completed, that thread will not wait further; instead it will return to the thread pool and execute another task. This process is called as work stealing technique. For example assume that “Thread A” starts the sorting of {6, 5, 3, 1, 8, 7, 2, 4}. Inside the “compute()” method it will divide the problem into two sub tasks, which are sorting {6, 5, 3, 1} and sorting {8, 7, 2, 4}. The invokeAll() method will allocate those tasks to another two threads B and C from the thread pool. Until the sorted arrays {6, 5, 3, 1} and {8, 7, 2, 4} returned the thread A cannot do anything. During that period Thread A will be used for any other sub tasks. For example the one of the sub tasks of Thread B or C (sorting {6, 5, 3, 1}) might be assigned to the Thread A. In this way using a limited number of threads Java optimizes the performance. Wait... once the sub tasks are completed (according to our example when the arrays {6, 5, 3, 1} and {8, 7, 2, 4} are sorted as {1, 3, 5, 6} and {2, 4, 7, 8}), there is a pending action – merging. Who is going to handle it? Answer is the same thread which starts that specific task. Based on the example I am using here, once the sorting of {6, 5, 3, 1} and {8, 7, 2, 4} is completed, 'Thread A' will merge them. To have a better idea, fix it in your mind: in Fork/Join implementation TASKS are waiting for SUB-TASKS to be completed, not the threads.


The pictorial representation shows a real scenario. Thread 1 is used for Task A and it starts two sub tasks Task A.1 and Task A.2. After that Task A is waiting for completion of sub tasks. During that period Thread 1 executes the Task A.1 and a new thread Thread 2 executes the Task A.2. It continues until all the tasks have been completed.

Now you might have a question. Can't we implement the same concurrent sorting, manually without using fork/join frame work? Here is the example.
public class MultiThreadArraySort {
    // From Java 7 '_' can be used to separate digits.
    public static final int ARRAY_SIZE = 10_000_000;

    public static void main(String[] args) {
        int[] array = createArray(ARRAY_SIZE);
        long startTime;
        long endTime;
        MergeSort mergeSort =
            new MergeSort(array, 0, array.length - 1);
        startTime = System.currentTimeMillis();
        mergeSort.start();
        try {
            // waiting for other threads to complete
            mergeSort.join();  
        } catch (InterruptedException ex) {}
        endTime = System.currentTimeMillis();
        System.out.println("Time taken: " + (endTime -
              startTime) + " millis");
    }
    // createArray method here...
}

/**
 * MergeSort is a sub class of Thread.
 */
class MergeSort extends Thread {
    private int array[];
    private int left;
    private int right;

    public MergeSort(int[] array, int left, int right) {
        this.array = array;
        this.left = left;
        this.right = right;
    }

    @Override
    public void run() {
        if (left < right) {
            int mid = (left + right) / 2;
            // Create two new threads
            Thread leftSort =
              new MergeSort(array, left, mid);
            Thread rightSort =
              new MergeSort(array, mid + 1, right);
            // Start the threads
            leftSort.start();
            rightSort.start();
            // Wait until left thread ends
            try {
                leftSort.join();    
            } catch (InterruptedException ex) {}
            // Wait until right thread ends
            try {
                rightSort.join();
            } catch (InterruptedException ex) {}
            merge(left, mid, right);
        }
    }
    // merge method here...
}
Find the complete source code at Git Hub.

In the above example a new Thread is created for every sub array sorting. Once I compile and run the program, it took... it took... I am really sorry to say that; actual output was thousands of “MemoryOutofError” (In fact, finally I closed the terminal to terminate the application). Look at the graph of memory usage, when I execute this application.


In the Fork/Join framework there are limited number of threads in the pool. ForkJoinPool creates threads based on the number of cores in the processor. For my computer with 4 cores, thread pool contained four threads. But in this MultiThreadArraySort code, it creates a new thread for every divided arrays. For an array with 10,000,000 elements it has to create nearly 19,999,999 Threads. Remember that, there are no thread sharing and even we cannot implement such a thread sharing by ourselves. So if it is executing successfully, at the end there will be 19,999,999 active threads and most of them are waiting for other threads to complete. Just imagine the load and performance problem we need to face. Yes threads are awesome, but there is a limit. If you continuously increase the number of threads beyond a level, then they will decrease the performance. Now I hope you understand the need for Fork/Join framework.

This is the time to look at the Java code and to learn the way to use Fork/Join framework. You need a ForkJoinPool to start the execution.
A ForkJoinPool is constructed with a given target parallelism level; by default, equal to the number of available processors. The pool attempts to maintain enough active (or available) threads by dynamically adding, suspending, or resuming internal worker threads, even if some tasks are stalled waiting to join others.
It has three important methods.
  • execute(ForkJoinTask): Arrange asynchronous execution
  • invoke(ForkJoinTask): Await and obtain result
  • submit(ForkJoinTask): Arrange exec and obtain Future
A task must be a subclass of java.util.concurrent.ForkJoinTask<V> abstract class, but there are two sub classes of ForkJoinTask and it is recommended to use those classes to create a task. Extend RecursiveAction if the sub tasks do not want to return any output. If an output is expected from the sub tasks, use RecursiveTask. In the previous merge sort program, RecursiveAction is used as the super class of MergeSort class since the sub tasks do not return any output. Following example is used to count the number of occurrences of a given integer in an array. Where the sub tasks must return an integer number so RecuriveTask is used as the super class of Counter.
import java.util.Random;
import java.util.concurrent.*;

public class SearchArray {
    public static void main(String[] args) {
      int[] array = {5, 6, 3, 2, 8, 1, 5, 5, 3, 9, 7, 5, 2, 4};
        // Search for 5
        Counter counter = 
        new Counter(5, array, 0, array.length - 1);
        // Create a pool of threads
        ForkJoinPool pool = new ForkJoinPool();
        // Start execution and wait for result
        int count = pool.invoke(counter);
        System.out.println("Count: " + count); // Count: 4
    }
}

class Counter extends RecursiveTask<Integer> {
    // The number to search
    private int num;
    private int[] array;
    private int left;
    private int right;

    public Counter(int num, int[] array, int left, int right) {
        this.num = num;
        this.array = array;
        this.left = left;
        this.right = right;
    }

    @Override
    public Integer compute() {
        if (left == right) {
            // Check the number
            if (array[left] == num) {
                return 1;
            }
        } else if (left < right) {
            int mid = (left + right) / 2;
            // Create sub tasks
            RecursiveTask<Integer> leftTask =
                      new Counter(num, array, left, mid);
            RecursiveTask<Integer> rightTask = 
                      new Counter(num, array, mid + 1, right);
            // Execute the sub tasks
            leftTask.fork();
            rightTask.fork();
            int count = 0;
            // Wait and recieve the outputs from sub tasks
            count += leftTask.join();
            count += rightTask.join();
            return count;
        }
        return 0;
    }
}
The sub classes must override the “compute()” methods and based on the super class the return type will be either void or any other objects. Inside that method you need to create sub tasks based on your requirement and start them using any suitable methods. In the Merge Sort “invokeAll” method is used to start the sub tasks since there are no need for any output; invokeAll method will start the sub tasks and make the executing thread to wait until those sub tasks are completed. On the other hand SearchArray example uses fork and join methods, where fork method is used to execute a new task asynchronously and join method is used to receive the output from the sub task. If the sub task is not completed, parent task waits at the join method.

In Fork/Join framework, threads must wait only for join method. Programmers are not recommended, to implement their own waiting mechanisms like synchronization or Object.wait method. Additionally, there should not be any input/output(IO) operations like accessing file or Internet within the “compute” method. Reason for these restrictions is very simple. Even though the join method causes the parent task to wait, actually the thread is not going to wait. It will get assigned to another task during that waiting period. If you have implemented any synchronization or Object.wait method that will push the thread into a waiting pool and the thread cannot be assigned for another task. In the same manner, if you access a file from a thread, sometimes the IO might be blocked by the operating system and causes the thread to wait for a while in an IO waiting pool (Simply thread has to wait). These idleness will break the back-bone of Fork/Join framework, which is reusing threads.

Finally, when to use Fork/Join framework? If a problem can be divided into sub problems and if those sub problems can be solved simultaneously, Fork/Join framework is the choice. However remember that, Fork/Join framework can improve the performance only if a multicore processor is used to run the application. If you have any doubts or suggestions please comment below, and for more details read this article Fork and Join: Java Can Excel at Painless Parallel Programming Too! on Oracle Technology Network.

Find the source codes at Git Hub.
Previous
Next Post »

2 comments

Write comments
Karthik Bharadwaj
AUTHOR
January 12, 2016 at 7:48 PM delete

Great article (Y)

Reply
avatar

Contact Form

Name

Email *

Message *