SRKS
SRKS

Reputation: 218

RxJava break onNext

I have an observable that emits long values. Once I receive a long value >0 then I just need to to break the emits and hit onComplete. Is there a better way to do this rather than the hack way that I did

long id = -1l;
 methodA()
    .subscribeOn(Schedulers.io())
    .flatMapObservable(list -> Observable.from(list))
    .flatMap(eachObject -> methodB(eachObject))
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Subscriber<Long>() {
            @Override
            public void onCompleted() {
                //some callbacks
            }
            @Override
           public void onError(Throwable e) {
                //just say error
           }
           @Override
           public void onNext(Long mId) {
                   if (id < 0) {
                    id = mId;
                   }
           }
  });

Upvotes: 0

Views: 249

Answers (1)

flakes
flakes

Reputation: 23614

Use takeWhile

Returns an Observable that emits items emitted by the source Observable so long as a specified condition is true.

Parameters:

predicate a function that evaluates an item emitted by the source Observable and returns a Boolean

Returns:

an Observable that emits the items from the source Observable so long as each item satisfies the condition defined by predicate

public Observable<T> takeWhile(final Func1<T, Boolean> predicate)
methodA()
    .subscribeOn(Schedulers.io())
    .flatMapObservable(Observable::from)
    .flatMap(this::methodB)
    .observeOn(AndroidSchedulers.mainThread())
    .takeWhile(id -> id <= 0)
    .subscribe(...);

Upvotes: 2

Related Questions