Working with Parallel Streams

Streams were introduced in Java 8. One of the powerful features of Streams API is that it has built-in concurrency support. Serial Stream is a stream in which the results are ordered, with only one entry being processed at a time. A Parallel Stream is a stream that is capable of processing results concurrently, using multiple threads. By default, the number of threads available in a parallel stream is related to the number of available CPUs in environment. In order to increase the thread count, developer would need to create his own custom class.

There are two ways of creating parallel streams :

  • Create a parallel stream from an existing stream. parallel() method must be called on an existing stream to convert it to one that supports multi-threaded processing, as shown below : Stream<Integer> stream = Arrays.asList(1,2,3,4,5,6).stream(); Stream<Integer> parallelStream = stream.parallel();  parallel() is an intermediate operation that operates on the original stream
  • Create a parallel stream from a Java collection class. Collection interface includes method parallelStream() that can be called on any collection and return a parallel stream, like in the following code : Stream<Integer> parallelStream = Arrays.asList(1,2,3,4,5,6).parallelStream();

The Stream interface includes a method isParallel() that can be used to test if the instance of a stream supports parallel processing. Some operations on streams preserve the parallel attribute, while others do not. For example, the Stream.concat(Stream s1, Stream s2) is parallel if either s1 or s2 is parallel. On the other hand, flatMap() creates a new stream that is not parallel by default, regardless of whether the underlying elements are parallel.

Processing Tasks in Parallel

Let’s look at the following example :
Arrays.asList(1,2,3,4,5,6)
        .parallelStream()
        .forEach(s → System.out.print(s + " "));

The output is unpredictable since forEach() operation is applied across multiple streams concurrently. forEach() operation on a parallel stream is equivalent to submitting multiple Runnable lambda expressions to a pooled thread executor. Stream API includes an alternate version of the forEach() operation called forEachOrdered(), which forces a parallel stream to process the results in order in the cost of performance. For instance, the following code snippet

Arrays.asList(1,2,3,4,5,6)
      .parallelStream()
      .forEachOrdered(s → System.out.print(s + " "));

outputs results in expected order : 1 2 3 4 5 6  This method can be called in the section of application that takes both serial and parallel streams and there is a need to ensure that the results are processed in a particulat order. Also, stream operations that occur before/after the forEachOrdered() can still gain performance improvements for using a parallel stream.

Parallel streams can improve performance because they rely on the property that many stream operations can be executed independently. Independent operations mean that the results of an operation on one element of a stream do not require or impact the results of another element of the stream. In the example

Arrays.asList("jackal", "kangaroo", "lemur")
      .parallelStream()
      .map(s → s.toUpperCase())
      .forEach(System.out::println);

mapping jackal to JACKAL can be done independently of mapping kangaroo to KANGAROO. In other words, multiple elements of the stream can be processed at the same time and the results will not change. In this terms, it is very important to avoid stateful lambda expressions. Stateful lambda expression is one whose result depends on any state that might change during the execution of a pipeline. The following example explains the statement :

List<Integer> data = Collections.synchronizedList(new ArrayList<>());

Arrays.asList(1,2,3,4,5,6).parallelStream()
.map(i -> {data.add(i); return i;})
.forEachOrdered(i -> System.out.print(i + " "));

System.out.println();
for (Integer e : data) {
System.out.print(e + " ");
}

Sample generation of this code snippet using a parallel stream :

1 2 3 4 5 6
2 4 3 5 6 1

The forEachOrdered() method displays the numbers in the stream sequentially, whereas the order of the elements in the data list is completely random. A stateful lambda expression, which modifies the data list in parallel, produces unpredictable results at runtime. With serial stream this would not have been noticeable, the results will match – 1 2 3 4 5 6. Anyway, it is strongly recommended to avoid stateful operations when using parallel streams for removing any potential side effects. In fact, they should be generally avoided in serial streams wherever possible, since they prevent streams from taking advantage of parallelization.

Leave a Reply

Your email address will not be published. Required fields are marked *