Managing Concurrent Processes

Fork/Join Framework

Sometimes it is a puzzle at the start of the process how many threads and tasks are needed to be performed. When a task gets too complicated, it can be split into multiple other tasks using the fork/join framework.

The fork/join framework relies on the concept of recursion to solve complex tasks. More about recursion – here.

Applying the fork/join framework requires developer to perform three steps :

  1. Create a ForkJoinTask
  2. Create the ForkJoinPool
  3. Start the ForkJoinTask

The first step is the most complex, as it requires defining the recursive process. The second and third are easy and can be completed with a single line of code. Fork/join solution bases on extending one of two abstract classes : RecursiveAction and RecursiveTask, both of which extend ForkJoinTask class. RecursiveAction requires to implement the compute() method, which returns void, to perform the bulk of work. The second class, RecursiveTask, is an abstract generic class that requires to implement the compute() method, which returns the generic type, to perform the bulk of work. The difference between RecursiveAction and RecursiveTask resembles the difference between Runnable and Callable, respectively.

Assume that the janitors cleaning the playground have to weigh all the litter gathered in trash cans before it is picked up. Since there are 12 trash cans overall and they are short of time as the litter is picked up on schedule, one janitor can weigh at most 3 cans. The recursive process in this case encompasses dividing the set of 12 trash cans into two sets of 6 trash cans. Then it further subdivides until each janitor has at most 3 trash cans to weigh. It is the base case for the example, the recursive path terminates when reaching it. And the recursive case is a sequence of dividing the set of trash cans. Example code is shown below :

public class WeighLitterAction extends RecursiveAction {
    private int start;
    private int end;
    private Double[] weights;

    public WeighLitterAction(Double[] weights, int start, int weight) {
        this.start = start;
        this.end = weight;
        this.weights = weights;
    }

    @Override
    protected void compute() {
        if (end - start <= 3) {
            for (int i = start; i < end; i++) {
                weights[i] = (double)new Random().nextInt(10);
                System.out.println("Litter bag weighed: " + i);
            }
        } else {
            int middle = start + ((end - start) / 2);
            System.out.println("[start=" + start + ", middle=" + middle + ", end=" + end + "]");
            invokeAll(new WeighLitterAction(weights, start, middle),
                      new WeighLitterAction(weights, middle, end));
        }
    }

    public static void main(String[] args) {
        Double[] weights = new Double[12];

        ForkJoinTask<?> task = new WeighLitterAction(weights, 0, weights.length);
        ForkJoinPool pool = new ForkJoinPool();
        pool.invoke(task);

        System.out.println();
        System.out.println("Weights: ");
        Arrays.asList(weights).stream().forEach(
                d -> System.out.print(d.intValue() + " ")
        );
    }
}

One of possible outputs will be :

[start=0, middle=6, end=12]
[start=0, middle=3, end=6]
Litter bag weighed: 0
Litter bag weighed: 1
Litter bag weighed: 2
Litter bag weighed: 3
Litter bag weighed: 4
[start=6, middle=9, end=12]
Litter bag weighed: 5
Litter bag weighed: 9
Litter bag weighed: 6
Litter bag weighed: 10
Litter bag weighed: 7
Litter bag weighed: 11
Litter bag weighed: 8

Weights:
8 3 7 5 3 3 5 7 3 8 9 6

The key concept to take away form the example is that the process as started as a single task, and it spawned additional concurrent tasks to split up the work after it had already started. The order of the output is not guaranteed, since some janitors can finish before others.

Previous Next

Leave a Reply

Your email address will not be published. Required fields are marked *