Alessandro Argentieri
Alessandro Argentieri

Reputation: 3215

Java Async MongoDB driver and RxJava2 Observables

I'm studying reactive programming with RxJava2 and I have a question about its usage with an async database driver like MongoDB.

If I use blocking MongoDB driver to get a collection the approach would be this:

public class MyDao{
   ...
   public Document getFirstDocument(String collectionName){
      MongoCollection<Document> collection = database.getCollection(collectionName);
      return collection.find().first();
   }
}



public class MyService {
   ...
   public Observable<Document> getFirstOf(String collectionName){
       return Observable.just(myDao.getFirstDocument(collectionName)); 
   }
}

Instead, working with the async Driver of MongoDB, my return type for a read operation is a void (and not a Document, or a Future)with a callback method inside, for example:

collection.find().first(
        (document, throwable) -> {
            myService.myCallback(document);
        }
);

So, how can I pass my Observable Documents to MyService?

public class MyDao{
   ...
   public void getFirstDocument(String collectionName){
      MongoCollection<Document> collection = database.getCollection(collectionName);
      collection.find().first(
        (document, throwable) -> {
            //SOME SORT OF CALLBACK
        }
     );
   }
}



public class MyService {
   ...
   public Observable<Document> getFirstOf(String collectionName){
       return ??????? 
   }
}

Upvotes: 0

Views: 769

Answers (1)

ConstOrVar
ConstOrVar

Reputation: 2085

When you are using Observable.just() in

public Observable<Document> getFirstOf(String collectionName){
    return Observable.just(myDao.getFirstDocument(collectionName)); 
}

it equals to next code

public Observable<Document> getFirstOf(String collectionName){
    Document doc = myDao.getFirstDocument(collectionName);
    return Observable.just(doc); 
}

You can notice that it's not async code and request to DB is performed on calling thread. To make that code async you need to rewrite it like that

public Observable<Document> getFirstOf(String collectionName){
    return Observable.fromCallable(() -> myDao.getFirstDocument(collectionName)); 
}

If you are using async MongoDB driver and would like to wrap it in Observable, you can write in that way

public Observable<Document> getFirstDocument(String collectionName) {
    return Observable.create(emitter -> {
        MongoCollection<Document> collection = database.getCollection(collectionName);
        collection.find().first((document, throwable) -> {
            if(document != null) {
                emitter.onNext(document);
                emitter.onComplete();
            } else if(throwable != null) {
                emitter.onError(throwable);
            }
        });
    });
}

Upvotes: 1

Related Questions