Sinh Phan
Sinh Phan

Reputation: 1266

RxJava2 Flowable do not call to onNext method in Subscriber

I'm new in rxjava and i have problem with Flowable. When onComplete method has called in flatMap, onNext method in subscriber not called. What's wrong here?. Have any suggestion?

        ResourceSubscriber<List<Song>> subscriber = new ResourceSubscriber<List<Song>>() {
        @Override
        public void onStart() {
            Log.e(TAG, "onStart");
            mView.showProgress();
        }

        @Override
        public void onNext(List<Song> songs) {
            mView.onLocalMusicLoaded(songs);
            mView.emptyView(songs.isEmpty());
            Log.e(TAG, "onNext");
            //Not call here after onComplete called
        }

        @Override
        public void onError(Throwable t) {
            mView.hideProgress();
            Log.e(TAG, "onError: ", t);
        }

        @Override
        public void onComplete() {
            mView.hideProgress();
            Log.e(TAG, "onComplete");
        }
    };


    Disposable disposable = Flowable.just(cursor)
            .flatMap(new Function<Cursor, Publisher<List<Song>>>() {
                @Override
                public Publisher<List<Song>> apply(@NonNull Cursor cursor) throws Exception {
                    final List<Song> songs = new ArrayList<>();
                    if (cursor != null && cursor.getCount() > 0) {
                        cursor.moveToFirst();
                        do {
                            Song song = cursorToMusic(cursor);
                            songs.add(song);
                        } while (cursor.moveToNext());
                    }
                    return Flowable.create(new FlowableOnSubscribe<List<Song>>() {
                        @Override
                        public void subscribe(@NonNull FlowableEmitter<List<Song>> e) throws Exception {
                            e.onNext(songs);
                            e.onComplete();
                        }
                    }, BackpressureStrategy.LATEST);
                }
            })
            .doOnNext(new Consumer<List<Song>>() {
                @Override
                public void accept(@NonNull List<Song> songs) throws Exception {
                    Log.e(TAG, "onLoadFinished doOnNext: " + songs.size());
                    Collections.sort(songs, new Comparator<Song>() {
                        @Override
                        public int compare(Song left, Song right) {
                            return left.getDisplayName().compareTo(right.getDisplayName());
                        }
                    });
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeWith(subscriber);

    mCompositeDisposable.add(disposable);

Upvotes: 3

Views: 1706

Answers (1)

akarnokd
akarnokd

Reputation: 69997

From the Javadoc of ResourceSubscriber:

The default onStart() requests Long.MAX_VALUE by default. Override the method to request a custom positive amount. Use the protected request(long) to request more items and dispose() to cancel the sequence from within an onNext implementation.

    @Override
    public void onStart() {
        Log.e(TAG, "onStart");
        mView.showProgress();
        request(Long.MAX_VALUE);
    }

Also you successfully found the Flowable.just operator, why did you replicate it via Flowable.create to emit the scalar sogs list?

Upvotes: 2

Related Questions