Java8 | Concurrency-Parallel Stream

Hasan Kadir Demircan
4 min readApr 7, 2023

As we discussed concurrency in Java earlier,
Now let’s look at that how we can use parallel stream in Java.

How can we perform parallel of the stream?

Stream<Integer> stream = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9).stream();
Stream<Integer> parallelStream = stream.parallel();

System.out.println(parallelStream.count());

or basic usage;

Stream<Integer> parallelStream = Arrays.asList(1, 2, 3, 4, 5, 6).parallelStream();
System.out.println(parallelStream.count());

What should we pay attention to in parallel stream?

Arrays.asList("jackal", "kangaroo", "lemur").parallelStream().map(s -> {
System.out.println(s);
return s.toUpperCase();
}).forEach(System.out::println);
  • For the exam, you should remember that parallel streams can process results.
  • Independently, although the order of the results cannot be determined ahead of time.

Another example is;

List<Integer> data = Collections.synchronizedList(new ArrayList<>());
Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8).parallelStream().map(i -> {
data.add(i); // // AVOID STATEFUL LAMBDA EXPRESSIONS!
return i;
}).forEachOrdered(i -> System.out.print(i + " "));

System.out.println();
System.out.println(data);
  • You can see that a stateful lambda expression, which modifies the data list in parallel, produces unpredictable results at runtime.

Another example is;

Stream<String> stream = Stream.of("w", "o", "l", "f").parallel();
Set<String> set = stream.collect(Collectors.toSet());
System.out.println(set);

//[f, w, l, o]

If we are working on with the collection! We should pay attention to not lose the data

List<Integer> data = new ArrayList<>();
Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8).parallelStream().map(i -> {
data.add(i); // // AVOID STATEFUL LAMBDA EXPRESSIONS!
return i;
}).forEachOrdered(i -> System.out.print(i + " "));

// 1 2 3 4 5 6 7 8
// [1, 3, 4, 5, 2] ->missing !
System.out.println();
System.out.println(data);
  • Anytime you are working with a collection with a parallel stream, it is recommended that you use a concurrent collection.
  • For an ArrayList object, the JVM internally manages a primitive array
    of the same type. As the size of the dynamic ArrayList grows, a new, larger primitive array is periodically required. If two threads both trigger the array to be resized at the same time, a result can be lost, producing the unexpected value shown here.

Concurrent Collection Example;

Stream<String> stream = Stream.of("w", "o", "l", "f").parallel();
SortedSet<String> set = stream.collect(ConcurrentSkipListSet::new, Set::add, Set::addAll);
System.out.println(set);
// [f, l, o, w]
  • ConcurrentSkipListSet are sorted according to their natural ordering.

ConcurrentMap()

Stream<String> stream1 = Stream.of("lions", "tigers", "bears").parallel();
ConcurrentMap<Integer, String> map = stream1
.collect(Collectors.toConcurrentMap(String::length, k -> k, (s1, s2) -> s1 + "," + s2));

System.out.println(map); // {5=lions,bears, 6=tigers}
System.out.println(map.getClass()); // java.util.concurrent.ConcurrentHashMap
  • ConcurrentMap is an extension of the Map interface. It aims to provides a structure and guidance to solve the problem of reconciling throughput with thread-safety.

GroupingByConcurrent()

Stream<String> stream1 = Stream.of("lions", "tigers", "bears").parallel();
ConcurrentMap<Integer, List<String>> map = stream1.collect(Collectors.groupingByConcurrent(String::length));
System.out.println(map); // {5=[lions, bears], 6=[tigers]}
  • If we want to group by concurrent, we can use groupingByConcurrent()

Let’s look at some basic functions of ParalellStream
unordered()

Arrays.asList(1, 5, 20, 4, 3, 8, 100)
.stream()
.unordered()
.parallel()
.forEach(System.out::println);
  • For serial streams, using an unordered version has no effect, but on parallel streams, the results can greatly improve performance.
  • You should know when to apply an unordered stream to improve performance.
  • This method(unordered) does not actually reorder the elements
  • It just tells the JVM that if an order-based stream operation is applied, the order can be ignored.

findAny()

Integer findAny1 = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).stream().findAny().get();
System.out.println(findAny1);
// parallel streams, the results of findAny() are no longer predictable.
Integer findAny2 = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).parallelStream().findAny().get();
System.out.println(findAny2);
  • When we run the basic code without paralellStream,

2023–04–08T00:47:30.348
1
2023–04–08T00:47:30.354
  • When we run the basic code with paralellStream,
2023-04-08T00:47:30.354
7
2023-04-08T00:47:30.356

ordered()

Arrays.asList(1, 2, 3, 4, 5, 6, 7).stream().forEach(s -> System.out.print(s + " "));

// the results are no longer ordered or predictable.
Arrays.asList(1, 2, 3, 4, 5, 6, 7).parallelStream().forEach(s -> System.out.print(s + “ “));
  • If we need ordered data and also needed to use paralellStream, we can use ordered() function.
  • In the above example, parallelStream cannot process the data it receives sequentially.
  Arrays.asList(2, 1, 3, 4, 5, 6, 7).parallelStream().forEachOrdered(s -> System.out.print(s + " "));
  • The output is -> 2 1 3 4 5 6 7 what we put in array sequentially.

reduce()

String reduced1 =Arrays.asList('w', 'o', 'l', 'f').stream().reduce("-x-", (String c, Character s) -> "he"+c+s,
(String s2, String s3) -> s2 + s3);
System.out.println(reduced1);

String reduced2 =Arrays.asList('w', 'o', 'l', 'f').parallelStream().reduce("", (String c, Character s) -> c + s,
(String s2, String s3) -> s2 + s3);
System.out.println(reduced2);
  • In Java, reducing is a terminal operation that aggregates a stream into a type or a primitive type
  • Another example is;
System.out.println(Arrays.asList(1,2,3,4,5,6)
.parallelStream()
.reduce(0,(a,b) -> (a-b))); // NOT AN ASSOCIATIVE ACCUMULATOR
  • It may output -21, 3, or some other value, as the accumulator function violates the associativity property.
  • Another example is;
System.out.println(Arrays.asList("w", "o", "l", "f").parallelStream().reduce("X", String::concat));
  • output is -> XwXoXlXf

Paralell Stream Speed Test

class WhaleDataCalculator {

int processRecord(int input) {
try {
Thread.sleep(5);
} catch (InterruptedException e) {
// Handle interrupted exception
}
return input + 1;
}

public void processAllData(List<Integer> data) {
data.stream().map(a -> processRecord(a)).count();
}

public void processAllDataParallel(List<Integer> data) {
data.parallelStream().map(a -> processRecord(a)).count();
}
}

public class Test04ParallelStream {

public static void main(String[] args) {
//
WhaleDataCalculator calculator = new WhaleDataCalculator();
// Define the data
List<Integer> data = new ArrayList<Integer>();
for (int i = 0; i < 4000; i++)
data.add(i);
// Process the data
long start = System.currentTimeMillis();
calculator.processAllData(data);

//Even better, the results scale with the number of processors.
//calculator.processAllDataParallel(data);
double time = (System.currentTimeMillis() - start) / 1000.0;
// Report results
System.out.println("\nTasks completed in: " + time + " seconds");
}
}
  • We created basic method and proceeded with ParalellStream and without ParalellStream;
  • When we call calculator.processAllData(data); The time of task completed -> Tasks completed in: 24.826 seconds
  • When we call calculator.processAllData(data); The time of task completed -> Tasks completed in: 3.108 seconds. It’s a better output :)
  • As you see, we have a Thread.sleep(), you can imagine this one like we are calling some apis and Thread.sleep means, latency of the API :)

Github: here

Thank you for reading here :)

--

--