Reputation: 11
I want to run a Java stream in which the first element filters in parallel to the second and third element. The second and third element can not filter parallel, because the use a shared ressource.
int numberOfFilteredElements = Stream.of(firstElement, secondElememt, thirdElement)
.filter(element -> element.someFilter())
.collect(Collectors.toSet())
.size();
A Java stream can be parallel or sequential. Concating streams are only parallel if all streams are parallel. So I can not create create a parallel and sequential stream and concat them.
Any other idea how I can solve this with streams? Or do I need to need use locks like ReentrantLock in someFilter() and run the stream parallel for all elements?
Upvotes: 1
Views: 176
Reputation: 2776
We can say:
Stream #1
.Stream #2
.Then we can create a Parent Parallel Stream
that consists of Stream #1
, Stream #2
:
Predicate<Integer> someFilter = i -> {
try { Thread.sleep(4000); } catch (InterruptedException e) {e.printStackTrace();}
logger.info(i + " filtered on thread id: " + Thread.currentThread().getId());
return i != 7;
};
Stream<Integer> stream1 = IntStream.rangeClosed(0, 3).boxed().filter(someFilter);
Stream<Integer> stream2 = IntStream.rangeClosed(-3, -1).boxed().filter(someFilter);
int numberOfFilteredElements = Stream.of(stream1, stream2)
.parallel()
.flatMap(Function.identity())
.peek(integer -> logger.info("Peeked: " + integer.toString()))
.collect(Collectors.toSet())
.size();
System.out.println("Set size is: " + numberOfFilteredElements);
Output:
2021-04-01 12:56:34.084 INFO 26336 --- [ main] com.example.demo.DemoApplication : Started DemoApplication in 2.966 seconds (JVM running for 5.717)
2021-04-01 12:56:38.098 INFO 26336 --- [ main] com.example.demo.DemoApplication : -3 filtered on thread id: 1
2021-04-01 12:56:38.098 INFO 26336 --- [onPool-worker-1] com.example.demo.DemoApplication : 0 filtered on thread id: 22
2021-04-01 12:56:38.100 INFO 26336 --- [ main] com.example.demo.DemoApplication : Peeked: -3
2021-04-01 12:56:38.100 INFO 26336 --- [onPool-worker-1] com.example.demo.DemoApplication : Peeked: 0
2021-04-01 12:56:42.100 INFO 26336 --- [onPool-worker-1] com.example.demo.DemoApplication : 1 filtered on thread id: 22
2021-04-01 12:56:42.100 INFO 26336 --- [ main] com.example.demo.DemoApplication : -2 filtered on thread id: 1
2021-04-01 12:56:42.100 INFO 26336 --- [ main] com.example.demo.DemoApplication : Peeked: -2
2021-04-01 12:56:42.100 INFO 26336 --- [onPool-worker-1] com.example.demo.DemoApplication : Peeked: 1
2021-04-01 12:56:46.101 INFO 26336 --- [onPool-worker-1] com.example.demo.DemoApplication : 2 filtered on thread id: 22
2021-04-01 12:56:46.101 INFO 26336 --- [ main] com.example.demo.DemoApplication : -1 filtered on thread id: 1
2021-04-01 12:56:46.101 INFO 26336 --- [ main] com.example.demo.DemoApplication : Peeked: -1
2021-04-01 12:56:46.101 INFO 26336 --- [onPool-worker-1] com.example.demo.DemoApplication : Peeked: 2
2021-04-01 12:56:50.102 INFO 26336 --- [onPool-worker-1] com.example.demo.DemoApplication : 3 filtered on thread id: 22
2021-04-01 12:56:50.102 INFO 26336 --- [onPool-worker-1] com.example.demo.DemoApplication : Peeked: 3
Set size is: 7
Notice that Stream #1
(which can be your first element) runs in parallel to Stream #2
(which can be second and third elements).
Upvotes: 1