Reputation: 9574
Vertx is a reactive framework. However, its MongoDB client API does not seem reactive.
To read records from a collection I have to use the following code:-
mongo.find(COLLECTION, new JsonObject(), results -> {
List<JsonObject> objects = results.result();
objects.stream().map(...) //... do something with it
});
Won't the line results.result()
will fetch the full result set into memory? As per my understanding it should wait for me to consume the data and not buffer it somewhere.
In MongoDB's own reactive driver the above code would have been:-
mongoDb.getCollection(COLLECTION).find().subscribe(new Subscriber<Document>() {
private Subscription subs;
@Override
public void onComplete() {
// Do something
}
@Override
public void onError(Throwable err) {
// Handle it
}
@Override
public void onNext(Document doc) {
// Do something with the result doc
subs.request(1);
}
@Override
public void onSubscribe(Subscription subs) {
this.subs = subs;
subs.request(10);
}
});
In the above case I did not have use any buffer. What am I missing in Vertx's implementation?
Upvotes: 0
Views: 453
Reputation: 3004
Based on tsegismont's answer mongoClient.findBatch
along with BufferUntilSubscriber
from RxJava could yield very same behavior, please try out below snippet:
BufferUntilSubscriber<JsonObject> bufferedStream = BufferUntilSubscriber.create();
mongoClient.findBatch("COLLECTION NAME", new JsonObject(), savedDocuments -> {
JsonObject result = savedDocuments.result();
bufferedStream.onNext(result);
});
bufferedStream.subscribe(new Subscriber<JsonObject>() {
@Override
public void onStart() {
request(1);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(JsonObject document) {
// do something with the emitted item "n"
System.out.println(document);
// request another item:
request(1);
}
});
The only thing here is the request
of the Subscriber
does not work with request more than 1
even if you try to get say 10
it would end up fetching only the next
result.
Upvotes: 0
Reputation: 9138
It's not well documented but the Vert.x Mongo Client has a findBatch
method:
JsonObject query = new JsonObject().put("author", "J. R. R. Tolkien");
mongoClient.findBatch("book", query, res -> {
if (res.succeeded()) {
if (res.result() == null) {
System.out.println("End of research");
} else {
System.out.println("Found doc: " + res.result().encodePrettily());
}
} else {
res.cause().printStackTrace();
}
});
Upvotes: 1