s.d
s.d

Reputation: 29436

Java Stream lifecycle callbacks

Does there exist an elegant way to register callbacks when the last element of the stream has been processed and the stream is "consumed" fully?

Especially when the stream source (like a DB cursor, an Iterator<T>, or a custom Supplier<T>) is finite and can explicitly know when there is no more data.

Example:

public Stream<Row> query(String sql){
   Connection c = dataSource.openConnection();
   Stream<Row> rows = MyDB.query(sql).stream();
   c.close();
   return rows;
}

Now, it's futile to immediately close the connection, rather it would be great to schedule connection closure when the stream is fully consumed.

I know there is onClose() API but this relies on consumers explicitly call close() on the stream.

Upvotes: 2

Views: 771

Answers (3)

s.d
s.d

Reputation: 29436

A utility that works mostly, except when the returned stream is not iterated to the end:

public <T> Stream<T> wrap(Stream<T> stream, Runnable onEnd) {
    final Object endSignal = new Object();
    return Stream.concat(stream, Stream.of(endSignal))
            .peek(i -> {
                if(i == endSignal){onEnd.run();}
            })
            .filter(i -> i != endSignal)
            .map(i -> (T) i);
}

Upvotes: 0

Seelenvirtuose
Seelenvirtuose

Reputation: 20648

You want to register a stream close handler:

public Stream<Row> query(String sql){
    Connection c = dataSource.openConnection();
    return MyDB.query(sql).stream().onClose(() -> c.close());
}

A caller of this method must excplicitely close the stream!

(Note: I left out exception handling here.)

Upvotes: 4

Andreas
Andreas

Reputation: 159165

Call onClose, and document that the returned stream must be closed by caller.

/**
 * The returned stream must be closed.
 */
public Stream<Row> query(String sql){
    Connection c = dataSource.openConnection();
    return MyDB.query(sql).stream().onClose(() -> {
        try {
            c.close();
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    });
}

Upvotes: 4

Related Questions