Reputation: 27
I am pretty new to RxJava and I need to create repository with several datasources. It is complex to me because there are several smaller subtasks which I don't know how to implement with RxJava.
First I have self written dao, which process InputStream, and provides Items in the specified range. Currently it simply collects data in a list, but I want to provide Items one by one using flowable; Currently it privides Maybe<List<Item>>
. Also there several errors need to be transmitted to higher level (datasource). Such as EndOfFile, to notify DataSource that data fully cached;
Dao.class
:
List<Item> loadRange(int start, int number) throws ... {
...
while(...) {
...
//TODO contribute item to flowable
resultList.add(new Item(...))
}
return resultList;
}
Maybe<List<Item>>
method just created Maybe.fromCallable()
;
Please help me!
Upvotes: 0
Views: 785
Reputation: 9919
Something like this should work for this :
Flowable<Item> loadRange(int start, int number) {
return Flowable.create(emitter -> {
try {
while (...){
emitter.onNext(new Item());
}
emitter.onComplete();
} catch (IOException e) {
emitter.onError(e);
}
}, BackpressureStrategy.BUFFER);
}
I assume once the loop is done you want to complete, also send errors downstream, rather than handle on the method signature. Also you can change the BackPressureStrategy
to suit your usecase i.e DROP
, LATEST
etc..
As you're new to RxJava, the anonymous class would be :
Flowable<Item> loadRange(int start, int number) {
return Flowable.create(new FlowableOnSubscribe<Item>() {
@Override public void subscribe(FlowableEmitter<Item> emitter) {
try {
while (...){
emitter.onNext(new Item());
}
emitter.onComplete();
} catch (IOException e) {
emitter.onError(e);
}
}
}, BackpressureStrategy.BUFFER);
}
Upvotes: 1