Reputation: 1377
Here is the background. I have a operation may scan many rows from hbase. Because the number of rows may be huge, I want to return a Stream of rows. The question is : How do I close the ResultScanner?
The method like this
public <T> Stream<T> getResultStream(String tableName,Scan scan, RowMapper<T> mapper){
scan.setCaching(5000);//set number of rows to fetch for each rpc
Table table=this.getConnection().getTable(tableName);
ResultScanner scanner = table.getScanner(scan);
return StreamSupport.stream(scanner.spliterator(),false).map(mapper::mapRow);
// scanner.close(); where to close it ?
}
Clearly I can not close ResultScanner in this method. Is there some elegant way to do it?
Upvotes: 0
Views: 123
Reputation: 21315
You can use a try-with-resources statement when calling the method to automatically close the stream once the block completes:
try(Stream<MyType> stream = getResultStream("myTable", myScan, myRowMapper)) {
stream.forEach(result -> {
// Do something with each result item
System.out.println(result);
})
}
Upvotes: 0
Reputation: 719376
There's a way.
First we observe that Stream
implements AutoCloseable
. So we can implement a Stream wrapper that closes the scanner
instance in its close()
method.
You could implement this by writing a Stream wrapper class by hand. The class simply needs to delegate all Stream
API calls to a wrapped Stream
class. In the case of close()
, it also needs to close the ResultScanner resource. You would provide that using an argument in the wrapper classes constructor.
It could look something like this:
public class <T> MyStreamWrapper implements Stream<T> {
private Stream<T> stream;
private AutoCloseable resource;
public MyStreamWrapper(Stream<T> stream, AutoCloseable resource) {
this.stream = stream;
this.resource = resource;
}
@Override
public close() {
this.stream.close();
this.resource.close();
}
// methods to delegate all other Stream API methods to this.stream
}
(Your IDE may be able to generate a skeletal wrapper class for you to save you the effort. Check your IDE's documentation, etc.)
You could possibly also implement this using a dynamic proxy implemented using java.lang.reflect.Proxy
... or some other way.
Once you have implemented the wrapper class, you can wrap and return the primary stream; e.g.
ResultScanner scanner = table.getScanner(scan);
Stream<T> stream =
StreamSupport.stream(scanner.spliterator(), false)
.map(mapper::mapRow);
return new MyStreamWrapper<>(stream, scanner);
To ensure that the scanner is actually closed, the result of this method should be assigned to a resource variable in a try with resources statement.
But it isn't elegant.
Upvotes: 0