haoyu wang
haoyu wang

Reputation: 1377

How to return a Stream and close the underline source?

Here is the background. I have a operation may scan many rows from hbase. Because the number of rows may be huge, I want to return a Stream of rows. The question is : How do I close the ResultScanner?

The method like this

    public <T> Stream<T> getResultStream(String tableName,Scan scan, RowMapper<T> mapper){
        scan.setCaching(5000);//set number of rows to fetch for each rpc 
        Table table=this.getConnection().getTable(tableName);
        ResultScanner scanner = table.getScanner(scan);
        return StreamSupport.stream(scanner.spliterator(),false).map(mapper::mapRow);
        // scanner.close(); where to close it ?
    }

Clearly I can not close ResultScanner in this method. Is there some elegant way to do it?

Upvotes: 0

Views: 123

Answers (2)

M. Justin
M. Justin

Reputation: 21315

You can use a try-with-resources statement when calling the method to automatically close the stream once the block completes:

try(Stream<MyType> stream = getResultStream("myTable", myScan, myRowMapper)) {
    stream.forEach(result -> {
        // Do something with each result item
        System.out.println(result);
    })
}

Upvotes: 0

Stephen C
Stephen C

Reputation: 719376

There's a way.

First we observe that Stream implements AutoCloseable. So we can implement a Stream wrapper that closes the scanner instance in its close() method.

You could implement this by writing a Stream wrapper class by hand. The class simply needs to delegate all Stream API calls to a wrapped Stream class. In the case of close(), it also needs to close the ResultScanner resource. You would provide that using an argument in the wrapper classes constructor.

It could look something like this:

public class <T> MyStreamWrapper implements Stream<T> {

    private Stream<T> stream;
    private AutoCloseable resource;

    public MyStreamWrapper(Stream<T> stream, AutoCloseable resource) {
        this.stream = stream;
        this.resource = resource;
    }

    @Override
    public close() {
        this.stream.close();
        this.resource.close();
    }
       
    // methods to delegate all other Stream API methods to this.stream
}  

(Your IDE may be able to generate a skeletal wrapper class for you to save you the effort. Check your IDE's documentation, etc.)

You could possibly also implement this using a dynamic proxy implemented using java.lang.reflect.Proxy ... or some other way.

Once you have implemented the wrapper class, you can wrap and return the primary stream; e.g.

    ResultScanner scanner = table.getScanner(scan);
    Stream<T> stream = 
         StreamSupport.stream(scanner.spliterator(), false)
                      .map(mapper::mapRow);
    return new MyStreamWrapper<>(stream, scanner);

To ensure that the scanner is actually closed, the result of this method should be assigned to a resource variable in a try with resources statement.

But it isn't elegant.

Upvotes: 0

Related Questions