AppleGrew
AppleGrew

Reputation: 9574

How do I stream result instead of fetching all into memory in Vertx MongoClient?

Vertx is a reactive framework. However, its MongoDB client API does not seem reactive.

To read records from a collection I have to use the following code:-

mongo.find(COLLECTION, new JsonObject(), results -> {
  List<JsonObject> objects = results.result();
  objects.stream().map(...) //... do something with it
});

Won't the line results.result() will fetch the full result set into memory? As per my understanding it should wait for me to consume the data and not buffer it somewhere.

In MongoDB's own reactive driver the above code would have been:-

mongoDb.getCollection(COLLECTION).find().subscribe(new Subscriber<Document>() {
    private Subscription subs;

            @Override
            public void onComplete() {
                // Do something
            }

            @Override
            public void onError(Throwable err) {
                // Handle it
            }

            @Override
            public void onNext(Document doc) {
                // Do something with the result doc
                subs.request(1);
            }

            @Override
            public void onSubscribe(Subscription subs) {
                this.subs = subs;
                subs.request(10);
            }
});

In the above case I did not have use any buffer. What am I missing in Vertx's implementation?

Upvotes: 0

Views: 453

Answers (2)

y ramesh rao
y ramesh rao

Reputation: 3004

Based on tsegismont's answer mongoClient.findBatch along with BufferUntilSubscriber from RxJava could yield very same behavior, please try out below snippet:

BufferUntilSubscriber<JsonObject> bufferedStream = BufferUntilSubscriber.create();

mongoClient.findBatch("COLLECTION NAME", new JsonObject(), savedDocuments -> {
    JsonObject result = savedDocuments.result();
    bufferedStream.onNext(result);
});

bufferedStream.subscribe(new Subscriber<JsonObject>() {
    @Override
    public void onStart() {
        request(1);
    }

    @Override
    public void onCompleted() {
    }

    @Override
    public void onError(Throwable e) {
    }

    @Override
    public void onNext(JsonObject document) {
        // do something with the emitted item "n"
        System.out.println(document);
        // request another item:
        request(1);
    }
});

The only thing here is the request of the Subscriber does not work with request more than 1 even if you try to get say 10 it would end up fetching only the next result.

Upvotes: 0

tsegismont
tsegismont

Reputation: 9138

It's not well documented but the Vert.x Mongo Client has a findBatch method:

JsonObject query = new JsonObject().put("author", "J. R. R. Tolkien");
mongoClient.findBatch("book", query, res -> {
  if (res.succeeded()) {
    if (res.result() == null) {
      System.out.println("End of research");
    } else {
      System.out.println("Found doc: " + res.result().encodePrettily());
    }
  } else {
    res.cause().printStackTrace();
  }
});

Upvotes: 1

Related Questions