Reputation: 12817
I am trying to mimic the behaviour of WireTap
in Spring Integration
& Apache Camel
in java8 stream, where a copy of current processing data is passed to WireTap
for processing it in a separate thread, it would be helpful for logging & auditing
Here I want only the logging in peek
want to run on separate thread
List<String> lines = ...
List<String> upperLines = lines.stream()
.map(String::toUpperCase)
.parallel() // which is hidden by the sequential
.peek(line -> System.out.println(line)) // don't want to run this fully on main thread
.sequential()
.collect(Collectors.toList());
Should I need to implement a separate method using BlockingQueue
or ExecutorService
for doing this
.peek(this::logger)
Upvotes: 0
Views: 161
Reputation: 298113
There is no way to process parts of a stream pipeline in a different mode and it wouldn’t pay off to implement such mixed mode pipeline, given, how simple submitting an asynchronous job is on your side:
ExecutorService es = Executors.newFixedThreadPool(4);
List<String> upperLines = lines.stream()
.map(String::toUpperCase)
.peek(line -> es.execute(() -> System.out.println(line)))
.collect(Collectors.toList());
es.shutdown();
or
List<String> upperLines = lines.stream()
.map(String::toUpperCase)
.peek(line -> CompletableFuture.runAsync(() -> System.out.println(line)))
.collect(Collectors.toList());
// when running this in the main method, avoid JVM termination before the async jobs:
ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.DAYS);
But note that in this specific case, there is actually no perceivable difference to, e.g.
List<String> upperLines = lines.stream()
.map(String::toUpperCase)
.collect(Collectors.toList());
upperLines.parallelStream().forEach(line -> System.out.println(line));
or, if you do not want to wait for the completion of the logging statement:
List<String> upperLines = lines.stream()
.map(String::toUpperCase)
.collect(Collectors.toList());
upperLines.forEach(line -> CompletableFuture.runAsync(() -> System.out.println(line)));
Upvotes: 2