Java8 | Concurrency-Parallel Stream

Hasan Kadir Demircan
4 min readApr 7, 2023

--

As we discussed concurrency in Java earlier,
Now let’s look at that how we can use parallel stream in Java.

How can we perform parallel of the stream?

Stream<Integer> stream = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9).stream();
Stream<Integer> parallelStream = stream.parallel();

System.out.println(parallelStream.count());

or basic usage;

Stream<Integer> parallelStream = Arrays.asList(1, 2, 3, 4, 5, 6).parallelStream();
System.out.println(parallelStream.count());

What should we pay attention to in parallel stream?

Arrays.asList("jackal", "kangaroo", "lemur").parallelStream().map(s -> {
System.out.println(s);
return s.toUpperCase();
}).forEach(System.out::println);
  • For the exam, you should remember that parallel streams can process results.
  • Independently, although the order of the results cannot be determined ahead of time.

Another example is;

List<Integer> data = Collections.synchronizedList(new ArrayList<>());
Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8).parallelStream().map(i -> {
data.add(i); // // AVOID STATEFUL LAMBDA EXPRESSIONS!
return i;
}).forEachOrdered(i -> System.out.print(i + " "));

System.out.println();
System.out.println(data);
  • You can see that a stateful lambda expression, which modifies the data list in parallel, produces unpredictable results at runtime.

Another example is;

Stream<String> stream = Stream.of("w", "o", "l", "f").parallel();
Set<String> set = stream.collect(Collectors.toSet());
System.out.println(set);

//[f, w, l, o]

If we are working on with the collection! We should pay attention to not lose the data

List<Integer> data = new ArrayList<>();
Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8).parallelStream().map(i -> {
data.add(i); // // AVOID STATEFUL LAMBDA EXPRESSIONS!
return i;
}).forEachOrdered(i -> System.out.print(i + " "));

// 1 2 3 4 5 6 7 8
// [1, 3, 4, 5, 2] ->missing !
System.out.println();
System.out.println(data);
  • Anytime you are working with a collection with a parallel stream, it is recommended that you use a concurrent collection.
  • For an ArrayList object, the JVM internally manages a primitive array
    of the same type. As the size of the dynamic ArrayList grows, a new, larger primitive array is periodically required. If two threads both trigger the array to be resized at the same time, a result can be lost, producing the unexpected value shown here.

Concurrent Collection Example;

Stream<String> stream = Stream.of("w", "o", "l", "f").parallel();
SortedSet<String> set = stream.collect(ConcurrentSkipListSet::new, Set::add, Set::addAll);
System.out.println(set);
// [f, l, o, w]
  • ConcurrentSkipListSet are sorted according to their natural ordering.

ConcurrentMap()

Stream<String> stream1 = Stream.of("lions", "tigers", "bears").parallel();
ConcurrentMap<Integer, String> map = stream1
.collect(Collectors.toConcurrentMap(String::length, k -> k, (s1, s2) -> s1 + "," + s2));

System.out.println(map); // {5=lions,bears, 6=tigers}
System.out.println(map.getClass()); // java.util.concurrent.ConcurrentHashMap
  • ConcurrentMap is an extension of the Map interface. It aims to provides a structure and guidance to solve the problem of reconciling throughput with thread-safety.

GroupingByConcurrent()

Stream<String> stream1 = Stream.of("lions", "tigers", "bears").parallel();
ConcurrentMap<Integer, List<String>> map = stream1.collect(Collectors.groupingByConcurrent(String::length));
System.out.println(map); // {5=[lions, bears], 6=[tigers]}
  • If we want to group by concurrent, we can use groupingByConcurrent()

Let’s look at some basic functions of ParalellStream
unordered()

Arrays.asList(1, 5, 20, 4, 3, 8, 100)
.stream()
.unordered()
.parallel()
.forEach(System.out::println);
  • For serial streams, using an unordered version has no effect, but on parallel streams, the results can greatly improve performance.
  • You should know when to apply an unordered stream to improve performance.
  • This method(unordered) does not actually reorder the elements
  • It just tells the JVM that if an order-based stream operation is applied, the order can be ignored.

findAny()

Integer findAny1 = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).stream().findAny().get();
System.out.println(findAny1);
// parallel streams, the results of findAny() are no longer predictable.
Integer findAny2 = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).parallelStream().findAny().get();
System.out.println(findAny2);
  • When we run the basic code without paralellStream,

2023–04–08T00:47:30.348
1
2023–04–08T00:47:30.354
  • When we run the basic code with paralellStream,
2023-04-08T00:47:30.354
7
2023-04-08T00:47:30.356

ordered()

Arrays.asList(1, 2, 3, 4, 5, 6, 7).stream().forEach(s -> System.out.print(s + " "));

// the results are no longer ordered or predictable.
Arrays.asList(1, 2, 3, 4, 5, 6, 7).parallelStream().forEach(s -> System.out.print(s + “ “));
  • If we need ordered data and also needed to use paralellStream, we can use ordered() function.
  • In the above example, parallelStream cannot process the data it receives sequentially.
  Arrays.asList(2, 1, 3, 4, 5, 6, 7).parallelStream().forEachOrdered(s -> System.out.print(s + " "));
  • The output is -> 2 1 3 4 5 6 7 what we put in array sequentially.

reduce()

String reduced1 =Arrays.asList('w', 'o', 'l', 'f').stream().reduce("-x-", (String c, Character s) -> "he"+c+s,
(String s2, String s3) -> s2 + s3);
System.out.println(reduced1);

String reduced2 =Arrays.asList('w', 'o', 'l', 'f').parallelStream().reduce("", (String c, Character s) -> c + s,
(String s2, String s3) -> s2 + s3);
System.out.println(reduced2);
  • In Java, reducing is a terminal operation that aggregates a stream into a type or a primitive type
  • Another example is;
System.out.println(Arrays.asList(1,2,3,4,5,6)
.parallelStream()
.reduce(0,(a,b) -> (a-b))); // NOT AN ASSOCIATIVE ACCUMULATOR
  • It may output -21, 3, or some other value, as the accumulator function violates the associativity property.
  • Another example is;
System.out.println(Arrays.asList("w", "o", "l", "f").parallelStream().reduce("X", String::concat));
  • output is -> XwXoXlXf

Paralell Stream Speed Test

class WhaleDataCalculator {

int processRecord(int input) {
try {
Thread.sleep(5);
} catch (InterruptedException e) {
// Handle interrupted exception
}
return input + 1;
}

public void processAllData(List<Integer> data) {
data.stream().map(a -> processRecord(a)).count();
}

public void processAllDataParallel(List<Integer> data) {
data.parallelStream().map(a -> processRecord(a)).count();
}
}

public class Test04ParallelStream {

public static void main(String[] args) {
//
WhaleDataCalculator calculator = new WhaleDataCalculator();
// Define the data
List<Integer> data = new ArrayList<Integer>();
for (int i = 0; i < 4000; i++)
data.add(i);
// Process the data
long start = System.currentTimeMillis();
calculator.processAllData(data);

//Even better, the results scale with the number of processors.
//calculator.processAllDataParallel(data);
double time = (System.currentTimeMillis() - start) / 1000.0;
// Report results
System.out.println("\nTasks completed in: " + time + " seconds");
}
}
  • We created basic method and proceeded with ParalellStream and without ParalellStream;
  • When we call calculator.processAllData(data); The time of task completed -> Tasks completed in: 24.826 seconds
  • When we call calculator.processAllData(data); The time of task completed -> Tasks completed in: 3.108 seconds. It’s a better output :)
  • As you see, we have a Thread.sleep(), you can imagine this one like we are calling some apis and Thread.sleep means, latency of the API :)

Github: here

Thank you for reading here :)

--

--

Hasan Kadir Demircan
Hasan Kadir Demircan

No responses yet