Lii
Lii

Reputation: 12112

Closing streams in the middle of pipelines

When I execute this code which opens a lot of files during a stream pipeline:

public static void main(String[] args) throws IOException {
    Files.find(Paths.get("JAVA_DOCS_DIR/docs/api/"),
            100, (path, attr) -> path.toString().endsWith(".html"))
        .map(file -> runtimizeException(() -> Files.lines(file, StandardCharsets.ISO_8859_1)))
        .map(Stream::count)
        .forEachOrdered(System.out::println);
}

I get an exception:

java.nio.file.FileSystemException: /long/file/name: Too many open files

The problem is that Stream.count does not close the stream when it is done traversing it. But I don't see why it shouldn't, given that it is a terminal operation. The same holds for other terminal operations such as reduce and forEach. flatMap on the other hand closes the streams it consists of.

The documentation tells me to use a try-with-resouces-statement to close streams if necessary. In my case I could replace the count line with something like this:

.map(s -> { long c = s.count(); s.close(); return c; } )

But that is noisy and ugly and could be a real inconvenience in some cases with big, complex pipelines.

So my questions are the following:

  1. Why were the streams not designed so that terminal operations close the streams they are working on? That would make them work better with IO streams.
  2. What is the best solution for closing IO streams in pipelines?

runtimizeException is a method that wraps checked exception in RuntimeExceptions.

Upvotes: 24

Views: 2988

Answers (5)

Lii
Lii

Reputation: 12112

It is possible to create a utility method that reliably closes streams in the middle of a pipeline.

This makes sure that each resource is closed with a try-with-resource-statement but avoids the need for a custom utility method, and is much less verbose than writing the try-statement directly in the lambda.

With this method the pipeline from the question looks like this:

Files.find(Paths.get("Java_8_API_docs/docs/api"), 100,
        (path, attr) -> path.toString().endsWith(".html"))
    .map(file -> applyAndClose(
        () -> Files.lines(file, StandardCharsets.ISO_8859_1),
        Stream::count))
    .forEachOrdered(System.out::println);

The implementation looks like this:

/**
 * Applies a function to a resource and closes it afterwards.
 * @param sup Supplier of the resource that should be closed
 * @param op operation that should be performed on the resource before it is closed
 * @return The result of calling op.apply on the resource 
 */
private static <A extends AutoCloseable, B> B applyAndClose(Callable<A> sup, Function<A, B> op) {
    try (A res = sup.call()) {
        return op.apply(res);
    } catch (RuntimeException exc) {
        throw exc;
    } catch (Exception exc) {
        throw new RuntimeException("Wrapped in applyAndClose", exc);
    }
}

(Since resources that need to be closed often also throw exceptions when they are allocated non-runtime exceptions are wrapped in runtime exceptions, avoiding the need for a separate method that does that.)

Upvotes: 10

Stuart Marks
Stuart Marks

Reputation: 132350

There are two issues here: handling of checked exceptions such as IOException, and timely closing of resources.

None of the predefined functional interfaces declare any checked exceptions, which means that they have to be handled within the lambda, or wrapped in an unchecked exception and rethrown. It looks like your runtimizeException function does that. You probably also had to declare your own functional interface for it. As you've probably discovered, this is a pain.

On the closing of resources like files, there was some investigation of having streams be closed automatically when the end of the stream was reached. This would be convenient, but it doesn't deal with closing when an exception is thrown. There's no magic do-the-right-thing mechanism for this in streams.

We're left with the standard Java techniques of dealing with resource closure, namely the try-with-resources construct introduced in Java 7. TWR really wants to have resources be closed at the same level in the call stack as they were opened. The principle of "whoever opens it has to close it" applies. TWR also deals with exception handling, which usually makes it convenient to deal with exception handling and resource closing in the same place.

In this example, the stream is somewhat unusual in that it maps a Stream<Path> to a Stream<Stream<String>>. These nested streams are the ones that aren't closed, resulting in the eventual exception when the system runs out of open file descriptors. What makes this difficult is that files are opened by one stream operation and then passed downstream; this makes it impossible to use TWR.

An alternative approach to structuring this pipeline is as follows.

The Files.lines call is the one that opens the file, so this has to be the resource in the TWR statement. The processing of this file is where (some) IOExceptions get thrown, so we can do the exception wrapping in the same TWR statement. This suggests having a simple function that maps the path to a line count, while handling resource closing and exception wrapping:

long lineCount(Path path) {
    try (Stream<String> s = Files.lines(path, StandardCharsets.ISO_8859_1)) {
        return s.count();
    } catch (IOException ioe) {
        throw new UncheckedIOException(ioe);
    }
}

Once you have this helper function, the main pipeline looks like this:

Files.find(Paths.get("JAVA_DOCS_DIR/docs/api/"),
           100, (path, attr) -> path.toString().endsWith(".html"))
     .mapToLong(this::lineCount)
     .forEachOrdered(System.out::println);

Upvotes: 24

fge
fge

Reputation: 121702

Here is an alternative which uses another method from Files and will avoid leaking file descriptors:

Files.find(Paths.get("JAVA_DOCS_DIR/docs/api/"),
    100, (path, attr) -> path.toString().endsWith(".html"))
    .map(file -> runtimizeException(() -> Files.readAllLines(file, StandardCharsets.ISO_8859_1).size())
    .forEachOrdered(System.out::println);

Unlike your version, it will return an int instead of a long for the line count; but you don't have files with that many lines, do you?

Upvotes: 0

nosid
nosid

Reputation: 50034

The close of the interface AutoCloseable should only be called once. See the documentation of AutoCloseable for more information.

If final operations would close the stream automatically, close might be invoked twice. Take a look at the following example:

try (Stream<String> lines = Files.lines(path)) {
    lines.count();
}

As it is defined right now, the close method on lines will be invoked exactly once. Regardless whether the final operation completes normally, or the operation is aborted with in IOException. If the stream would instead be closed implicitly in the final operation, the close method would be called once, if an IOException occurs, and twice if the operation completes successfully.

Upvotes: 3

skiwi
skiwi

Reputation: 69259

You will need to call close() in this stream operation, which will cause all underlying close handlers to be called.

Better yet, would be to wrap your whole statement in a try-with-resources block, as then it will automagically call the close handler.

This may not be possibility in your situation, this means that you will need to handle it yourself in some operation. Your current methods may not be suited for streams at all.

It seems like you indeed need to do it in your second map() operation.

Upvotes: 4

Related Questions