lambdas
lambdas

Reputation: 4088

Combine observables in the order of availability of its elements

I'm trying to build an operator like s.startWith(x), but conditional one - let's call it s.startWithIfNothingAvailable(x). I want it to prefix stream with an x only if s has no elements available at the moment of subscription.

Let me illustrate the idea with an example.

s is a stream of reports from server.

I think the other way of solving that is to use something like .concat but which order observables by availability of its elements.

Observable.concatFirstAvailable(serverReport, emptyReport), if serverReport has no elements yet - switch to emptyReport and than get back to waiting on serverReport.

Upvotes: 2

Views: 61

Answers (2)

akarnokd
akarnokd

Reputation: 70007

You could merge with a delayed special report item:

// imitate infinite hot service
PublishSubject<Report> service = PublishSubject.create();

// special report indicating the service has no reports
Report NO_REPORT = new Report();

AtomicBoolean hasValue = new AtomicBoolean();

service
// we'll need the main value for both emission and control message
.publish(main ->
     // this will keep "listening" to main and allow a timeout as well
     main.mergeWith(
         // signal the empty report indicator
         Observable.just(NO_REPORT)
         // after some grace period so main can emit a real report
         .delay(100, TimeUnit.MILLISECONDS)
         // but if the main emits first, don't signal the empty report
         .takeUntil(main)
     )
)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(report -> {
     if (report == NO_REPORT) {
         // even if this onNext is serialized, NO_REPORT may get emitted
         if (!hasValue.get()) {
             // display empty report
         }
     } else {
         // this indicates a NO_REPORT should be ignored onward
         hasValue.set(true);
         // display normal report
     }
}, error -> {  /* show error */ })

Thread.sleep(200); // Thread.sleep(50)
service.onNext(new Report());

Upvotes: 1

TmTron
TmTron

Reputation: 19411

If I understand your request correctly, then you could maybe use startWith() and then sample()

e.g. with a sample time of 50ms. When no report arrives within the first 50ms, then the startWith element will be used and you render the empty state. Otherwise, the latest report will be used.

Sample will also make sure that you don't try to update your UI too often: e.g. when the sever sends 2 reports within the 50ms, you don't want to render both, just the latest one.

Upvotes: 0

Related Questions