Reputation: 3215
I'm studying reactive programming with RxJava2 and I have a question about its usage with an async database driver like MongoDB.
If I use blocking MongoDB driver to get a collection the approach would be this:
public class MyDao{
...
public Document getFirstDocument(String collectionName){
MongoCollection<Document> collection = database.getCollection(collectionName);
return collection.find().first();
}
}
public class MyService {
...
public Observable<Document> getFirstOf(String collectionName){
return Observable.just(myDao.getFirstDocument(collectionName));
}
}
Instead, working with the async Driver of MongoDB, my return type for a read operation is a void (and not a Document, or a Future)with a callback method inside, for example:
collection.find().first(
(document, throwable) -> {
myService.myCallback(document);
}
);
So, how can I pass my Observable Documents to MyService?
public class MyDao{
...
public void getFirstDocument(String collectionName){
MongoCollection<Document> collection = database.getCollection(collectionName);
collection.find().first(
(document, throwable) -> {
//SOME SORT OF CALLBACK
}
);
}
}
public class MyService {
...
public Observable<Document> getFirstOf(String collectionName){
return ???????
}
}
Upvotes: 0
Views: 769
Reputation: 2085
When you are using Observable.just()
in
public Observable<Document> getFirstOf(String collectionName){
return Observable.just(myDao.getFirstDocument(collectionName));
}
it equals to next code
public Observable<Document> getFirstOf(String collectionName){
Document doc = myDao.getFirstDocument(collectionName);
return Observable.just(doc);
}
You can notice that it's not async
code and request to DB is performed on calling thread. To make that code async
you need to rewrite it like that
public Observable<Document> getFirstOf(String collectionName){
return Observable.fromCallable(() -> myDao.getFirstDocument(collectionName));
}
If you are using async
MongoDB driver and would like to wrap it in Observable
, you can write in that way
public Observable<Document> getFirstDocument(String collectionName) {
return Observable.create(emitter -> {
MongoCollection<Document> collection = database.getCollection(collectionName);
collection.find().first((document, throwable) -> {
if(document != null) {
emitter.onNext(document);
emitter.onComplete();
} else if(throwable != null) {
emitter.onError(throwable);
}
});
});
}
Upvotes: 1