Alex R
Alex R

Reputation: 11891

How to flatten a list inside a stream of completable futures?

I have this:

Stream<CompletableFuture<List<Item>>>

how can I convert it to

Stream<CompletableFuture<Item>>

Where: the second stream is comprised of each and all the Items inside each of the lists in the first stream.

I looked into thenCompose but that solves a completely different problem which is also referred to as "flattening".

How can this be done efficiently, in a streaming fashion, without blocking or prematurely consuming more stream items than necessary?

Here is my best attempt so far:

    ExecutorService pool = Executors.newFixedThreadPool(PARALLELISM);
    Stream<CompletableFuture<List<IncomingItem>>> reload = ... ;

    @SuppressWarnings("unchecked")
    CompletableFuture<List<IncomingItem>> allFutures[] = reload.toArray(CompletableFuture[]::new);
    CompletionService<List<IncomingItem>> queue = new ExecutorCompletionService<>(pool);
    for(CompletableFuture<List<IncomingItem>> item: allFutures) {
        queue.submit(item::get);
    }
    List<IncomingItem> THE_END = new ArrayList<IncomingItem>();
    CompletableFuture<List<IncomingItem>> ender = CompletableFuture.allOf(allFutures).thenApply(whatever -> {
        queue.submit(() -> THE_END);
        return THE_END;
    });
    queue.submit(() -> ender.get());
    Iterable<List<IncomingItem>> iter = () -> new Iterator<List<IncomingItem>>() {
        boolean checkNext = true;
        List<IncomingItem> next = null;
        @Override
        public boolean hasNext() {
            if(checkNext) {
                try {
                    next = queue.take().get();
                } catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException(e);
                }
                checkNext = false;
            }
            if(next == THE_END || next == null) {
                return false;
            }
            else {
                return true;
            }
        }
        @Override
        public List<IncomingItem> next() {
            if(checkNext) {
                hasNext();
            }
            if(!hasNext()) {
                throw new IllegalStateException();
            }
            checkNext = true;
            return next;
        }
    };
    Stream<IncomingItem> flat = StreamSupport.stream(iter.spliterator(), false).flatMap(List::stream);

This works at first, unfortunately, it has a fatal bug: the resulting stream seems to terminate prematurely, before retrieving all the items.

2023 UPDATE

Four years later, I still have not chosen an answer, because both answers at the moment are saying this is "impossible". That doesn't sound right. I'm simply asking for an efficient way to take items that are completed in batches and be able to monitor their completion individually. Perhaps this is easier with Virtual Threads in Java 21? I no longer maintain this codebase but the question remains a great unsolved problem.

Update#2: Clarification of assumptions

CompletableFuture represents a future result of an asynchronous computation according to https://www.geeksforgeeks.org/completablefuture-in-java/ ... interestingly the JavaDocs are not quite as clear.

Update#3:

Performance is not the focus of the question, only efficiency, in a specific sense: if task 1 takes T1 to complete and task 2 takes T2 to complete, then waiting for a total of max(T1,T2) time for completion is more efficient than waiting for T1+T2 time. Then extend that to N-sized lists of tasks T, U, V, ..., and it probably looks something like max(T1..Tn)+max(U1..Un)+max(V1..Vn) or hopefully max(T1..Vn). Assuming the same N for all lists here is only an explanatory simplification (really should be I, J, K...). In other words, please assume that the asynchronous tasks which are represented in the Stream of Lists are I/O bound, not CPU-bound. This can be simulated by inserting random Thread.sleep() if you wish to demonstrate something in code. Apologies for the sloppy notation - I have a background in Computer Science but it's been a while since I've tried to formally describe a problem like this.

Update#4

Based on the answer from @Slaw, I can see that the core problem I was facing is the mismatch between stream arrival order (the order in which futures are provided through the Stream) and future completion order (the order in which each CompletableFuture completes execution and unlocks the List within it). So a new revised TL;DR for this question is: How can you take a Stream that's produced in arbitrary order, and re-order it into futures execution completion order, so that a flatMap() operation will not block unnecessarily?

Upvotes: 10

Views: 2148

Answers (8)

Slaw
Slaw

Reputation: 46156

The Problem

As I understand it, your problem can be summarized by the following:

  1. You have multiple producers running concurrently. These producers are instances of CompletableFuture.

    • In your case, you are getting these futures from a Stream, but the actual source of the futures doesn't seem critical, just so long as the source is a container that can be iterated in some way.
  2. You have a single consumer running concurrently with the producers. The consumer is a Stream.

  3. You want to stay within the stream API, at least from an external perspective. In other words, you want to map Stream<CompletableFuture<T>> (the producers) to Stream<T> (the consumer).

  4. You want to consume the result of a producer as soon as possible. No waiting to process a result because the consumer is blocked waiting for a different producer to complete.

At a fundamental level, the question seems to be how to stream the results from asynchronous computations without blocking on the futures themselves.


Solution

The major hurdle to overcome here is that we don't know when a future will complete. That wouldn't matter much if you didn't care about processing results as soon as possible, because then you could just do:

stream.map(CompletableFuture::join)

But you do care about processing results as soon as possible, and the above will wait on a future even if another one has completed. To do what you want, we'll have to use an intermediate BlockingQueue that each future adds its result to when complete (without blocking). Then we need another separate stream that reads from that queue until all results have been processed.

Here's a slightly modified version of Holger's solution which:

  • Can work with any CompletionStage implementation.

  • Gives you the stage results directly instead of flattening them for you. If your results are lists, then you can just do flatMap(List::stream) when needed.

    • As an aside, addressing the original question, I'm of the opinion that if your results are lists, then wrapping each individual element of a list in its own CompletableFuture doesn't add much value. Especially since each future will have to be, by design, already completed (normally or exceptionally). If you want to launch asynchronous tasks for each element in a list, do that downstream. And if you want to handle exceptions, rather than crashing the stream on the first failure, then you can modify the below to expose the nested Result class (possibly modified itself) or handle the exceptions in the stages upstream (e.g., exceptionally[Async] or handle[Async] stages).
  • Implements splitting based on the result of calling trySplit() on the source Spliterator.

Note: This replaces my old, inferior solution. You can look at the edit history to see the old solution if you want.  

import java.util.Objects;
import java.util.Spliterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class WhenCompleteSpliterator<T> implements Spliterator<T> {

    public static <T> Stream<T> streamResults(Iterable<? extends CompletionStage<T>> iterable) {
        return StreamSupport.stream(new WhenCompleteSpliterator<>(iterable.spliterator()), false);
    }

    public static <T> Stream<T> mapToResults(Stream<? extends CompletionStage<T>> stream) {
        return StreamSupport.stream(new WhenCompleteSpliterator<>(stream.spliterator()), false);
    }

    private final BlockingQueue<Result<T>> queue = new LinkedBlockingQueue<>();
    private final AtomicLong pending = new AtomicLong();
    private Result<T> immediate;

    private final Spliterator<? extends CompletionStage<T>> upstream;
    private final long estimatedSize;

    public WhenCompleteSpliterator(Spliterator<? extends CompletionStage<T>> upstream) {
        this.upstream = Objects.requireNonNull(upstream);
        this.estimatedSize = upstream.estimateSize();
    }

    @Override
    public long estimateSize() {
        return estimatedSize;
    }

    @Override
    public int characteristics() {
        return 0; // No characteristics.
    }

    @Override
    public Spliterator<T> trySplit() {
        // Split based on upstream's ability to split.
        var upstreamSplit = upstream.trySplit();
        return upstreamSplit == null ? null : new WhenCompleteSpliterator<>(upstreamSplit);
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> action) {
        Objects.requireNonNull(action);

        // Take from queue immediately if not empty.
        if (takeFromQueue(false, action)) {
            return true;
        }

        // Consume upstream stages until all consumed or an already-completed stage is encountered.
        while (upstream.tryAdvance(this::consumeStage)) {
            if (takeImmediate(action)) {
                return true;
            }
        }

        // Take from queue if not empty OR there are pending stages.
        if (takeFromQueue(true, action)) {
            return true;
        }

        return false;
    }

    private void consumeStage(CompletionStage<T> stage) {
        var cf = stage instanceof CompletableFuture<T> self ? self : stage.toCompletableFuture();
        if (cf.isDone()) {
            try {
                immediate = Result.ofSuccess(cf.join());
            } catch (CompletionException | CancellationException ex) {
                immediate = Result.ofFailure(ex);
            }
        } else {
            pending.incrementAndGet();
            stage.whenComplete((value, error) -> {
                queue.add(error == null ? Result.ofSuccess(value) : Result.ofFailure(error));
                pending.decrementAndGet();
            });
        }
    }

    private boolean takeFromQueue(boolean waitForPending, Consumer<? super T> action) {
        try {
            if ((waitForPending && pending.get() > 0L) || !queue.isEmpty()) {
                action.accept(queue.take().valueOrThrow());
                return true;
            }
            return false;
        } catch (InterruptedException ex) {
            throw new CompletionException("Interrupted while waiting for upstream stage result.", ex);
        }
    }

    private boolean takeImmediate(Consumer<? super T> action) {
        var immediate = this.immediate;
        if (immediate != null) {
            this.immediate = null;
            action.accept(immediate.valueOrThrow());
            return true;
        }
        return false;
    }

    private static class Result<T> {

        static <T> Result<T> ofSuccess(T value) {
            return new Result<>(value, null);
        }

        static <T> Result<T> ofFailure(Throwable error) {
            assert error != null;
            return new Result<>(null, error);
        }

        private final T value;
        private final Throwable error;

        private Result(T value, Throwable error) {
            this.value = value;
            this.error = error;
        }

        T valueOrThrow() {
            if (error == null) {
                return value;
            } else {
                var message = "An upstream stage completed exceptionally.";
                if (error instanceof CompletionException || error instanceof ExecutionException) {
                    var cause = error.getCause();
                    if (cause != null) {
                        // Wrap exception in new CompletionException for stack trace purposes.
                        throw new CompletionException(message, cause);
                    }
                }
                // Wrap exception in new CompletionException for stack trace purposes.
                throw new CompletionException(message, error);
            }
        }
    }
}

And here's an example using the above:

import java.time.Clock;
import java.time.LocalTime;
import java.time.ZoneId;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;

public class Main {

    public static void main(String[] args) throws Exception {
        var clock = Clock.tickMillis(ZoneId.systemDefault());

        var executor = Executors.newVirtualThreadPerTaskExecutor();
        var futures = new Random().ints(5, 2, 30)
                .mapToObj(seconds -> CompletableFuture.supplyAsync(() -> {
                    try {
                        System.out.printf("[%s] Beginning %,2d second sleep!%n", LocalTime.now(clock), seconds);
                        Thread.sleep(seconds * 1000);
                        return String.format("[%s] Slept for %,2d seconds!", LocalTime.now(clock), seconds);
                    } catch (InterruptedException ex) {
                        throw new RuntimeException(ex);
                    }
                }, executor));
        WhenCompleteSpliterator.mapToResults(futures).forEach(System.out::println);
        System.out.println("DONE!");
    }
}

Note the example uses a virtual thread pool simply to ensure all "sleep tasks" are started around the same time, regardless of how many there are. A "normal" cached thread pool could do the same job (though you'd probably want to use daemon threads for the example).

And example output:

[22:07:24.134] Beginning 28 second sleep!
[22:07:24.132] Beginning  4 second sleep!
[22:07:24.134] Beginning 10 second sleep!
[22:07:24.133] Beginning 20 second sleep!
[22:07:24.133] Beginning  6 second sleep!
[22:07:28.166] Slept for  4 seconds!
[22:07:30.158] Slept for  6 seconds!
[22:07:34.169] Slept for 10 seconds!
[22:07:44.158] Slept for 20 seconds!
[22:07:52.160] Slept for 28 seconds!
DONE!

Virtual Threads

Virtual threads won't really help with the mapping from Stream<CompletableFuture<T>> to Stream<T>. Though since your background tasks are I/O bound, using a virtual thread pool to execute said background tasks might be beneficial.

Upvotes: 1

Holger
Holger

Reputation: 298311

Here is a solution which not only avoids blocking, by preferring to process already completed futures, it even retains lazy processing of the original stream, as far as possible. As long as there are completed futures among the already encountered futures it will not advance in the source traversal.

This is best demonstrated by an example using an infinite source which still can complete in finite time (and quite fast due to the preference of completed futures).

Stream<CompletableFuture<List<Integer>>> streamOfFutures = Stream.generate(
    () -> CompletableFuture.supplyAsync(
        () -> ThreadLocalRandom.current().ints(10, 0, 200).boxed().toList(),
        CompletableFuture.delayedExecutor(
            ThreadLocalRandom.current().nextLong(5), TimeUnit.SECONDS))
);
System.out.println(flattenResults(streamOfFutures)
    .peek(System.out::println)
    .anyMatch(i -> i == 123)
);

The implementation will process already completed futures immediately on encounter. Only if the future has not been completed yet, a queuing action will be chained and the pending counter increased. Care must be taken to decrease the counter even on exceptional completion and to queue an item (an empty list), to unblock the consumer thread in case it’s taking an element right at this point. The exception will be propagated to the caller when encountered. Like with short-circuiting parallel streams, it’s possible to miss errors if the result is found before processing all elements.

If the terminal operation is short-circuiting and finishes without processing the entire stream, the counter is irrelevant and the operation will not wait for the completion of pending futures. Only when the source stream has been traversed completely, the counter becomes relevant for detecting when all futures have been completed.

static <T> Stream<T> flattenResults(Stream<CompletableFuture<List<T>>> stream) {
    Spliterator<CompletableFuture<List<T>>> srcSp = stream.spliterator();
    BlockingQueue<List<T>> queue = new LinkedBlockingQueue<>();

    return StreamSupport.stream(new Spliterators.AbstractSpliterator<T>(
                                                 srcSp.estimateSize(), 0) {
        final AtomicLong pending = new AtomicLong();
        Spliterator<T> fromCurrentList;
        Throwable failure;

        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            if(checkExisting(action)) return true;

            while(srcSp.tryAdvance(this::checkNew)) {
                if(checkExisting(action)) return true;
            }

            return checkAfterSourceExhausted(action);
        }
        private boolean checkExisting(Consumer<? super T> action) {
            for(;;) {
                var sp = fromCurrentList;
                if(sp == null) {
                    List<T> newList = queue.poll();
                    if(newList == null) return false;
                    fromCurrentList = sp = newList.spliterator();
                }
                if(sp.tryAdvance(action)) return true;
                fromCurrentList = null;
            }
        }

        private void checkNew(CompletableFuture<List<T>> f) {
            if(f.isDone()) fromCurrentList = f.join().spliterator();
            else {
                pending.incrementAndGet();
                f.whenComplete((r, t) -> {
                    if(t != null) {
                        failure = t;
                        r = List.of();
                    }
                    queue.offer(r);
                    pending.decrementAndGet();
                });
            }
        }

        private boolean checkAfterSourceExhausted(Consumer<? super T> action) {
            while(pending.get() != 0 || !queue.isEmpty()) {
                checkFailure();
                try {
                    List<T> newList = queue.take();
                    fromCurrentList = newList.spliterator();
                    if(checkExisting(action)) return true;
                } catch(InterruptedException ex) {
                    throw new CompletionException(ex);
                }
            }
            return false;
        }

        private void checkFailure() {
            Throwable t = failure;
            if(t != null) {
                if(t instanceof RuntimeException rt) throw rt;
                if(t instanceof Error e) throw e;
                throw new CompletionException(t);
            }
        }
    }, false);
}

You may use something like

Stream<CompletableFuture<List<Integer>>> streamOfFutures = IntStream.range(0, 10)
    .mapToObj(i -> 
      CompletableFuture.supplyAsync(
        () -> IntStream.range(i * 10, (i + 1) * 10).boxed().toList(),
        CompletableFuture.delayedExecutor(10 - i, TimeUnit.SECONDS)));

System.out.println(flattenResults(streamOfFutures)
    .peek(System.out::println)
    .anyMatch(i -> i == 34)
);

to visualize the “first completed, first processed”. Or change the terminal operation to

flattenResults(streamOfFutures).forEach(System.out::println);

to demonstrate the completion of all futures is correctly recognized or

if(flattenResults(streamOfFutures).count() != 100)
    throw new AssertionError();
else
    System.out.println("Success");

to have something which can be integrated into automated test.

Upvotes: 4

Alex R
Alex R

Reputation: 11891

Here's some pseudo-code of what I'm thinking now after conversation with @Slaw:

public class StreamTest {

    static <T> Stream<T> flattenResults(Stream<CompletableFuture<List<T>>> stream) {
        List<T> EOL = new ArrayList<>();
        BlockingQueue<List<T>> queue = new LinkedBlockingQueue<>();

        Thread thread = Thread.ofVirtual().start(() -> {
            stream.forEach(future -> future.thenAcceptAsync(queue::offer));

            // this pseudo-code method ensures EOL is not added prematurely
            blockUntilAllAcceptAsyncsHaveExecuted();

            queue.offer(EOL);
        });

        return Stream.generate(() -> {
                    try {
                        return queue.poll(Integer.MAX_VALUE, TimeUnit.SECONDS);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                })
                .stopWhen(i -> i == EOL) // this method doesn't exist!
                .filter(i -> i != EOL)
                .flatMap(List::stream);
    }
}

Upvotes: 0

Roma S
Roma S

Reputation: 104

In your update4 you say:

How can you take a Stream that's produced in arbitrary order, and re-order it into futures execution completion order, so that a flatMap() operation will not block unnecessaril

Suppose u have 3 futures that completes in 2, 5 and 10 seconds accordingly and u wants some method that returns an ordered by execution time stream, so in this way u have to wait until all futures are done (or fail) because until u dont know is future is done or fail u dont know how long was execution time.
You can create ordered stream of results with blocking:

    private static final ExecutorService executorService = Executors.newFixedThreadPool(15);
    private static final ExecutorService consumerService = Executors.newFixedThreadPool(15);
    private static final Random random = new Random();
    private static final AtomicInteger ID = new AtomicInteger(0);
    private static final List<DelayedListSupplier> completionOrder = Collections.synchronizedList(new ArrayList<>());

    @RequiredArgsConstructor
    @Getter
    public static class DelayedListSupplier implements Supplier<List<String>> {
        private final int delayMs;
        private final int id;

        @Override
        public List<String> get() {
            try {
                if (random.nextInt(10) < 2) {
                    System.out.println(
                            String.format(
                                    "Thread [%s]; Future %s FAILED.",
                                    Thread.currentThread().getName(),
                                    id
                            )
                    );
                    throw new RuntimeException("Error!" + id);
                }
                Thread.sleep(delayMs);
                System.out.println(
                        String.format(
                                "Thread [%s]; Future %s is completed.",
                                Thread.currentThread().getName(),
                                id
                        )
                );
                //just for SOUT
                completionOrder.add(this);
                return List.of("Future" + id);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static void main(String[] args) throws Exception {
        System.out.println("Start blocking");
        blockingAnswer();
        System.out.println("Start non blocking");
        nonBlockingAnswer();
        executorService.shutdown();
        consumerService.awaitTermination(10, TimeUnit.SECONDS);
        consumerService.shutdown();
    }

    private static void nonBlockingAnswer() {
        List<DelayedListSupplier> delayedSuppliers = createDelayedSuppliers();
        Stream<CompletableFuture<List<String>>> futuresStream = getFutures(delayedSuppliers);
        futuresStream.forEachOrdered(cf -> {
            cf.whenComplete((r,e) -> {
                if (e == null) {
                    consumerService.execute(() -> {
                        //handle results here
                        System.out.println("Recevied results from future" + r.get(0));
                    });
                }
            });
        });
    }

    private static void blockingAnswer() throws InterruptedException {
        //prepare futures
        List<DelayedListSupplier> suppliers = createDelayedSuppliers();
        //map them to CompletableFuture
        Stream<CompletableFuture<List<String>>> cf = getFutures(suppliers);
        List<List<String>> results = Collections.synchronizedList(new LinkedList<>());

        //we need collect futures for using CountDownLatch
        List<CompletableFuture<List<String>>> futures = cf.collect(Collectors.toList());

        int maxWaitTime = suppliers.stream()
                .max(Comparator.comparing(DelayedListSupplier::getDelayMs))
                .map(DelayedListSupplier::getDelayMs)
                .orElseThrow();
        long start = System.currentTimeMillis();
        //for method exit we need to N futures completed or failed
        CountDownLatch cdl = new CountDownLatch((int) futures.size());
        futures.forEach(f -> {
            //add post-execution logic for each future
            f.whenComplete((r, e) -> {
                cdl.countDown();
                //if no exception this is actual result
                if (e == null) {
                    results.add(r);
                }
            });
        });

        //wait until each future will done or 1000 days
        cdl.await(1000, TimeUnit.DAYS);
        long end = System.currentTimeMillis();
        System.out.println(
                String.format(
                        "Execution time: %s ms; Max wait time: %s",
                        end - start,
                        maxWaitTime
                )
        );
        System.out.println("Future Results: " + completionOrder.stream().map(s -> "" + s.getId()).collect(Collectors.joining(", ")));

        //our result stream here
        Stream<String> resultStream = results.stream().flatMap(Collection::stream);
        System.out.println(
                String.format(
                        "FlatMap: %s",
                        resultStream.collect(Collectors.joining(", "))
                )
        );
    }

    private static List<DelayedListSupplier> createDelayedSuppliers() {
        List<DelayedListSupplier> suppliers = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            suppliers.add(
                    new DelayedListSupplier(
                            (random.nextInt(6) + 1) * 1000,
                            ID.incrementAndGet()
                    )
            );
        }
        return suppliers;
    }

    private static Stream<CompletableFuture<List<String>>> getFutures(List<DelayedListSupplier> suppliers) {
        return suppliers.stream()
                .map(s -> CompletableFuture.supplyAsync(s, executorService));
    }

For non blocking at all u have to use Producer/Consumer pattern (see in method nonBlockingAnswer)

Upvotes: 0

daniu
daniu

Reputation: 15008

This has been discussed, but I'd still like to give my variation of Nishika's answer with a slight repositioning:

Stream<CompletableFuture<List<Item>>> origStream = // ...
// make streams out of the lists first
final Stream<CompletableFuture<Stream<Item>>> streamFutures =
             origStream.map(f -> f.thenApply(Collection::stream));

This makes it clearer that the actual item streams of the individual futures can only be retrieved when their completion stage is finished.

Additionally, you'll only be able to flatMap these streams by again waiting for the individual completion, ie joining on the futures. So now you can just do:

final Stream<Item> itemStream = streamFutures.flatMap(CompletableFuture::join);

You can make a CompletableFuture out of this:

final CompletableFuture<Stream<Item>> itemStreamFuture =
                CompletableFuture.supplyAsync(() -> streamFutures.flatMap(CompletableFuture::join));

but you hardly gain any concurrency except the call isn't blocking. I'd wager the joins themselves are handled concurrently if the stream is created as parallel.

Upvotes: 0

davidalayachew
davidalayachew

Reputation: 1045

I have this:

Stream<CompletableFuture<List<Item>>>

how can I convert it to

Stream<CompletableFuture<Item>>

I looked into thenCompose but that solves a completely different problem which is also referred to as "flattening".

How can this be done efficiently, in a streaming fashion, without blocking or prematurely consuming more stream items than necessary?

This is absolutely possible, but long story short, it will save you no work, it will add overhead in ways that provide you no benefit, and will actually make your code slower in the long run. You will have accomplished nothing of value by doing this.

The reason for this is simple -- CompletableFuture operates on a push-down design. Meaning, the INSTANT you attempt to attach a chaining call (thenApply, thenCompose, etc) or a complete call (get, join, complete), CompletableFuture will IMMEDIATELY start computation. It may be easier to think of CompletableFuture to being eager evaluation the moment that you chain a follow-up function to it. Not every follow up function will trigger computation, but all of the useful ones will.

What this means is that, the second you try to even touch (metaphorically) a CompletableFuture, you have triggered it to start computing. And this computation is "necessary", so it still meets your requirements.

I am happy to go into detail about why the function you wanted to call on the CompletableFuture<Item> would either be the same impact as just fetching from the CompletableFuture<List<Item>>, or would be even more expensive and slower for no benefit. Please tell me, then I will amend this answer and explain exactly why your desired function will not help you in any useful way, other than being the form that you like for whatever arbitrary reason.

Regardless, if this is truly what you want, then here is how.


import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;

public class SOQ_20230926
{

   public static void main(String[] args) throws Exception
   {
   
      record Item(int index) {}
   
      final Stream<CompletableFuture<List<Item>>> pointA =
         create
         (
            List
               .of
               (
                  List.of(new Item(1), new Item(2), new Item(3)),
                  List.of(new Item(1), new Item(2), new Item(3)),
                  List.of(new Item(1), new Item(2), new Item(3)),
                  List.of(new Item(1), new Item(2), new Item(3)),
                  List.of(new Item(1), new Item(2), new Item(3)),
                  List.of(new Item(1), new Item(2), new Item(3)),
                  List.of(new Item(1), new Item(2), new Item(3)),
                  List.of(new Item(1), new Item(2), new Item(3)),
                  List.of(new Item(1), new Item(2), new Item(3)),
                  List.of(new Item(1), new Item(2), new Item(3)),
                  List.of(new Item(1), new Item(2), new Item(3)),
                  List.of(new Item(1), new Item(2), new Item(3)),
                  List.of(new Item(1), new Item(2), new Item(3)),
                  List.of(new Item(1), new Item(2), new Item(3)),
                  List.of(new Item(1), new Item(2), new Item(3)),
                  List.of(new Item(1), new Item(2), new Item(3)),
                  List.of(new Item(1), new Item(2), new Item(3)),
                  List.of(new Item(1), new Item(2), new Item(3)),
                  List.of(new Item(1), new Item(2), new Item(3)),
                  List.of(new Item(1), new Item(2), new Item(3)),
                  List.of(new Item(1), new Item(2), new Item(3)),
                  List.of(new Item(1), new Item(2), new Item(3)),
                  List.of(new Item(4), new Item(5), new Item(6)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9)),
                  List.of(new Item(7), new Item(8), new Item(9))
               )
         )
         ;
   
      final Stream<CompletableFuture<Item>> pointZ = convert(pointA);
   
      final Item item =
         pointZ
            .parallel()
            .map(CompletableFuture::join)
            .filter(eachItem -> eachItem.index() == 4)
            .findFirst()
            .orElseThrow()
            ;
   
      System.out.println(item);
   
   }

   private static <T> Stream<CompletableFuture<T>> convert(final Stream<CompletableFuture<List<T>>> pointA)
   {
   
      return
         pointA
            .parallel()
            .map
            (
               eachFuture ->
                  eachFuture
                     .thenApply
                     (
                        batch ->
                           batch
                              .parallelStream()
                              .map(eachElement -> CompletableFuture.supplyAsync(() -> eachElement))
                              .toList()
                     )
            )
            .map(CompletableFuture::join)
            .flatMap(List::stream)
            ;
   
   }

   private static <T> Stream<CompletableFuture<List<T>>> create(final List<List<T>> lists)
   {
   
      return
         lists
            .parallelStream()
            .map
            (
               eachList ->
                  CompletableFuture
                     .supplyAsync
                     (
                        () ->
                        {
                        
                           System.out.println("COMPUTATION WAS PERFORMED");
                        
                           return eachList;
                        
                        }
                     )
         
            )
            ;
   
   }

}

And here is the output.

COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
COMPUTATION WAS PERFORMED
Item[index=4]

As you can see, the Stream "stopped early", only doing the work necessary to find the Item it needed.

If it had done more work than needed, we would have seen all 54 lines of COMPUTATION WAS PERFORMED, but we only see 31, because that was all that was needed to find our value. The rest of the batches remained unopened because we found what we needed.

This should meet your 2 requirements. Those are the following.

I have this:

Stream<CompletableFuture<List<Item>>>

how can I convert it to

Stream<CompletableFuture<Item>>

How can this be done efficiently, in a streaming fashion, without blocking or prematurely consuming more stream items than necessary?

Upvotes: 0

Nishika
Nishika

Reputation: 41

Agreed with Johannes Kuhn. You can't know Futures's state while it's still executing and thus can not convert from Stream<CompletableFuture<List>> to Stream<CompletableFuture> .

Although the output of stream can be merged using following piece of code - java Stream<CompletableFuture<List<Item>>> to java List<Item> or

java List<CompletableFuture<List<AuditRecord>>> to java List<Item>

List<Item> output = input.map(CompletableFuture::join).collect(toList()).stream()
    .flatMap(Collection::stream).collect(toList());

Upvotes: 4

Johannes Kuhn
Johannes Kuhn

Reputation: 15173

As I wrote in my comment, this is impossible.

Consider a some arbitrary service, which will return a CompletableFuture<Integer>:

CompletableFuture<Integer> getDiceRoll();

I can now convert this CompletableFuture<Integer> to a Stream<CompletableFuture<List<Object>>> without any problem:

Stream<CompletableFuture<List<Object>>> futureList = Stream.of(getDiceRoll().thenApply(n -> List.of(new Object[n])));

Let's suppose there would be a general way to turn a Stream<CompletableFuture<List<T>>> into a Stream<CompletableFuture<T>>:

<T> Stream<CompletableFuture<T> magic(Stream<CompletableFuture<List<T>>> arg);

Then I can do the following:

int diceRoll = magic(Stream.of(getDiceRoll().thenApply(n -> List.of(new Object[n])))).count();

Wait, what?
I am now able to get an arbitrary integer out of a CompletableFuture. Which means, with some engineering effort I can get all the information out of a CompletableFuture - after all, memory is just some numbers.

So we have to conclude that a method like magic can not exist, without violating the time fabric.
And this is the answer: There is no such method, because it can not exist.

Upvotes: 1

Related Questions