Reputation: 1266
I'm new in rxjava and i have problem with Flowable.
When onComplete
method has called in flatMap
, onNext
method in subscriber not called. What's wrong here?. Have any suggestion?
ResourceSubscriber<List<Song>> subscriber = new ResourceSubscriber<List<Song>>() {
@Override
public void onStart() {
Log.e(TAG, "onStart");
mView.showProgress();
}
@Override
public void onNext(List<Song> songs) {
mView.onLocalMusicLoaded(songs);
mView.emptyView(songs.isEmpty());
Log.e(TAG, "onNext");
//Not call here after onComplete called
}
@Override
public void onError(Throwable t) {
mView.hideProgress();
Log.e(TAG, "onError: ", t);
}
@Override
public void onComplete() {
mView.hideProgress();
Log.e(TAG, "onComplete");
}
};
Disposable disposable = Flowable.just(cursor)
.flatMap(new Function<Cursor, Publisher<List<Song>>>() {
@Override
public Publisher<List<Song>> apply(@NonNull Cursor cursor) throws Exception {
final List<Song> songs = new ArrayList<>();
if (cursor != null && cursor.getCount() > 0) {
cursor.moveToFirst();
do {
Song song = cursorToMusic(cursor);
songs.add(song);
} while (cursor.moveToNext());
}
return Flowable.create(new FlowableOnSubscribe<List<Song>>() {
@Override
public void subscribe(@NonNull FlowableEmitter<List<Song>> e) throws Exception {
e.onNext(songs);
e.onComplete();
}
}, BackpressureStrategy.LATEST);
}
})
.doOnNext(new Consumer<List<Song>>() {
@Override
public void accept(@NonNull List<Song> songs) throws Exception {
Log.e(TAG, "onLoadFinished doOnNext: " + songs.size());
Collections.sort(songs, new Comparator<Song>() {
@Override
public int compare(Song left, Song right) {
return left.getDisplayName().compareTo(right.getDisplayName());
}
});
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(subscriber);
mCompositeDisposable.add(disposable);
Upvotes: 3
Views: 1706
Reputation: 69997
From the Javadoc of ResourceSubscriber:
The default
onStart()
requestsLong.MAX_VALUE
by default. Override the method to request a custom positive amount. Use the protectedrequest(long)
to request more items anddispose()
to cancel the sequence from within anonNext
implementation.
@Override
public void onStart() {
Log.e(TAG, "onStart");
mView.showProgress();
request(Long.MAX_VALUE);
}
Also you successfully found the Flowable.just
operator, why did you replicate it via Flowable.create
to emit the scalar sogs list?
Upvotes: 2