Saravana
Saravana

Reputation: 12817

How to run only certain intermediate operations in parallel in Java-8?

I am trying to mimic the behaviour of WireTap in Spring Integration & Apache Camel in java8 stream, where a copy of current processing data is passed to WireTap for processing it in a separate thread, it would be helpful for logging & auditing

Here I want only the logging in peek want to run on separate thread

List<String> lines = ...

List<String> upperLines = lines.stream()
    .map(String::toUpperCase)
    .parallel() // which is hidden by the sequential
    .peek(line -> System.out.println(line)) // don't want to run this fully on main thread
    .sequential()
    .collect(Collectors.toList());

Should I need to implement a separate method using BlockingQueue or ExecutorService for doing this

.peek(this::logger)

Upvotes: 0

Views: 161

Answers (1)

Holger
Holger

Reputation: 298113

There is no way to process parts of a stream pipeline in a different mode and it wouldn’t pay off to implement such mixed mode pipeline, given, how simple submitting an asynchronous job is on your side:

ExecutorService es = Executors.newFixedThreadPool(4);
List<String> upperLines = lines.stream()
    .map(String::toUpperCase)
    .peek(line -> es.execute(() -> System.out.println(line)))
    .collect(Collectors.toList());
es.shutdown();

or

List<String> upperLines = lines.stream()
    .map(String::toUpperCase)
    .peek(line -> CompletableFuture.runAsync(() -> System.out.println(line)))
    .collect(Collectors.toList());
// when running this in the main method, avoid JVM termination before the async jobs:
ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.DAYS);

But note that in this specific case, there is actually no perceivable difference to, e.g.

List<String> upperLines = lines.stream()
    .map(String::toUpperCase)
    .collect(Collectors.toList());
upperLines.parallelStream().forEach(line -> System.out.println(line));

or, if you do not want to wait for the completion of the logging statement:

List<String> upperLines = lines.stream()
    .map(String::toUpperCase)
    .collect(Collectors.toList());
upperLines.forEach(line -> CompletableFuture.runAsync(() -> System.out.println(line)));

Upvotes: 2

Related Questions