mabi
mabi

Reputation: 5306

Best way to sequentially pass list values to single value consumer?

I'm toying with Java8's streams and CompletableFutures. My pre-existing code has a class that takes a single URL and downloads it:

public class FileDownloader implements Runnable {
    private URL target;
    public FileDownloader(String target) {
        this.target = new URL(target);
    }
    public void run() { /* do it */ }
}

Now, this class gets it's information from another part that emits List<String> (a number of targets on a single host).

I've switched the surrounding code to CompletableFuture:

public class Downloader {
    public static void main(String[] args) {
        List<String> hosts = fetchTargetHosts();
        for (String host : hosts) {
            HostDownloader worker = new HostDownloader(host);
            CompletableFuture<List<String>> future = 
                CompletableFuture.supplyAsync(worker);
            future.thenAcceptAsync((files) -> {
                for (String target : files) {
                    new FileDownloader(target).run();
                }
            });
        }
    }

    public static class HostDownloader implements Supplier<List<String>> {
        /* not shown */ 
    }
    /* My implementation should either be Runnable or Consumer.
       Please suggest based on a idiomatic approach to the main loop.
     */
    public static class FileDownloader implements Runnable, Consumer<String> { 
        private String target;
        public FileDownloader(String target) {
            this.target = target;
        }

        @Override
        public void run() { accept(this.target); }

        @Override
        public void accept(String target) {
            try (Writer output = new FileWriter("/tmp/blubb")) {
                output.write(new URL(target).getContent().toString());
            } catch (IOException e) { /* just for demo */ }
        }
    }
}

Now, this doesn't feel natural. I'm producing a stream of Strings and my FileDownloader consumes one of them at a time. Is there a readymade to enable my single value Consumer to work with Lists or am I stuck with the for loop here?

I know it's trivial to move the loop into the accept and just make a Consumer<List<String>>, that's not the point.

Upvotes: 5

Views: 3577

Answers (3)

Holger
Holger

Reputation: 298311

There is no point in dissolving two directly dependent steps into two asynchronous steps. They are still dependent and if the separation has any effect, it won’t be a positive one.

You can simply use

List<String> hosts = fetchTargetHosts();
FileDownloader fileDownloader = new FileDownloader();
for(String host: hosts)
    CompletableFuture.runAsync(()->
        new HostDownloader(host).get().forEach(fileDownloader));

or, assuming that FileDownloader does not have mutable state regarding a download:

for(String host: hosts)
    CompletableFuture.runAsync(()->
        new HostDownloader(host).get().parallelStream().forEach(fileDownloader));

This still has the same level of concurrency as your original approach using supplyAsync plus thenAcceptAsync, simply because these two dependent steps can’t run concurrently anyway, so the simple solution is to put both steps into one concise operation that will be executed asynchronously.


However, at this point it’s worth noting that the entire use of CompletableFuture is not recommended for this operation. As it’s documentation states:

The problem with the common pool is that its pre-configured concurrency level depends on the number of CPU cores and won’t be adjusted if threads are blocked during an I/O operation. In other words, it is unsuitable for I/O operations.

Unlike Stream, CompletableFuture allows you to specify an Executor for the async operations, so you can configure your own Executor to be suitable for I/O operations, on the other hand, when you deal with an Executor anyway, there is no need for CompletableFuture at all, at least not for such a simple task:

List<String> hosts = fetchTargetHosts();

int concurrentHosts = 10;
int concurrentConnections = 100;
ExecutorService hostEs=Executors.newWorkStealingPool(concurrentHosts);
ExecutorService connEs=Executors.newWorkStealingPool(concurrentConnections);

FileDownloader fileDownloader = new FileDownloader();
for(String host: hosts) hostEs.execute(()-> {
    for(String target: new HostDownloader(host).get())
        connEs.execute(()->fileDownloader.accept(target));
});

At this place you may consider either, to inline the code of FileDownloader.accept into the lambda expression or to revert it to be a Runnable so that you can change the inner loop’s statement to connEs.execute(new FileDownloader(target)).

Upvotes: 4

Sharon Ben Asher
Sharon Ben Asher

Reputation: 14348

I think you need to do forEach like that:

for (String host : hosts) {
    HostDownloader worker = new HostDownloader(host);
    CompletableFuture<List<String>> future = 
            CompletableFuture.supplyAsync(worker);
    future.thenAcceptAsync(files -> 
            files.stream()
            .forEach(target -> new FileDownloader(target).run())
    );
}

by the way, you could do the same with the main loop...

edit: Since OP edited original post, adding implementation details of FileDownloader, I am editing my answer accordingly. Java 8 functional interface is meant to allow the use of lambda expr in place of concrete Class. It is not meant to be implemented like regular interface 9although it can be) Therefor, "to take advantage of" Java 8 consumer means replacing FileDownloader with the code of accept like this:

for (String host : hosts) {
    HostDownloader worker = new HostDownloader(host);
    CompletableFuture<List<String>> future = CompletableFuture.supplyAsync(worker);
    future.thenAcceptAsync(files -> 
            files.forEach(target -> {
                try (Writer output = new FileWriter("/tmp/blubb")) {
                    output.write(new URL(target).getContent().toString());
                } catch (IOException e) { /* just for demo */ }
            })
    );
}

Upvotes: 1

assylias
assylias

Reputation: 328727

An alternative would be:

CompletableFuture.supplyAsync(worker)
                 .thenApply(list -> list.stream().map(FileDownloader::new))
                 .thenAccept(s -> s.forEach(FileDownloader::run));

Upvotes: 1

Related Questions