Managing Concurrent Processes

What if workers need to compute the sum of all weight values while weighing cans. Instead of extending RecursiveAction developer should extend the generic RecursiveTask to calculate and return each sum in the compute() method. The following is an updated implementation that uses RecursiveTask<Double> :

public class WeighLitterSumAction extends RecursiveTask<Double> {
    private int start;
    private int end;
    private Double[] weights;

    public WeighLitterSumAction(Double[] weights, int start, int weight) {
        this.start = start;
        this.end = weight;
        this.weights = weights;
    }

    @Override
    protected Double compute() {
        if (end - start <= 3) {
            double sum = 0;
            for (int i = start; i < end; i++) {
                weights[i] = (double)new Random().nextInt(10);
                System.out.println("Litter bag weighed: " + i);
                sum += weights[i];
            }
            return sum;
        } else {
            int middle = start + ((end - start) / 2);
            System.out.println("[start=" + start + ", middle=" + middle + ", end=" + end + "]");
            RecursiveTask<Double> otherTask = new WeighLitterSumAction(weights, start, middle);
            otherTask.fork();
            return new WeighLitterSumAction(weights, middle, end).compute() + otherTask.join();
        }
    }

    public static void main(String[] args) {
        Double[] weights = new Double[12];

        ForkJoinTask<Double> task = new WeighLitterSumAction(weights, 0, weights.length);
        ForkJoinPool pool = new ForkJoinPool();
        Double sum = pool.invoke(task);
        System.out.println("Sum: " + sum);

        System.out.println();
        System.out.println("Weights: ");
        Arrays.asList(weights).stream().forEach(
                d -> System.out.print(d.intValue() + " ")
        );
    }
}

While the base case is mostly unchanged, except for returning a sum value, the recursive case is quite different. Since the invokeAll() method does not return a value, fork() and join() commands are used to retrieve the recursive data. The fork() method instructs the fork/join framework to complete the task in a separate thread, while the join() method causes the current thread to wait for the results.

In the above example, the [middle, end] range is computed using the current thread, since there is one available, and the [start, middle] range is computed using a separate thread. Then the results are combined waiting for the otherTask to complete. And there is one update in the main method – as pool.invoke(task) returns a value, the result is assigned to sum variable and printed out in the console.

One thing to be careful about when using the fork() and join() methods is the order in which they are applied. The previous example was multi-threaded, but the following variation operates with single-threaded performance :

RecursiveTask<Double> otherTask = new WeighLitterSumAction(weights, start, middle);

Double otherResult = otherTask.fork().join();

return new WeighLitterSumAction(weights, middle, end).compute() + otherResult;

In this example, the current thread calls join(), causing it to wait for the [start, middle] subtask to finish before starting on the [middle, end] subtask. So fork() must be called before the current thread begins a subtask and join() must be called after it finishes retrieving the results, in order for them to be done in parallel.

Previous

Leave a Reply

Your email address will not be published. Required fields are marked *