godzsa
godzsa

Reputation: 2395

Flatten Observable of Observables

What I would like to do is create a function which runs another function every second. The second function returns Observables<A> and I want the first function to return Observables<A> as well instead of Observable<Observable<A>>

for example:

private A calcA(){
   ...
   return new A(...)
}

public Observable<A> getAs(){
   return Observable.create( subscriber -> {
      Bool condition = ...
      do {
         subscriber.onNext(calcA())
      } while (condition)
      subscriber.onComplete()
   })
}

public Observable<A> pollAs(){
   return Observable.create(subscriber -> {
      do {
         subscriber.onNext(getAs()) // Flatten here I guess
         Thread.sleep(1000)
      } while(true)
   })

So I would like to do something similar (I tried to write this in a Java-ish way, but I will use Kotlin)

Upvotes: 0

Views: 611

Answers (2)

Bob Dalgleish
Bob Dalgleish

Reputation: 8227

You don't need to use the flatMap() operator to flatten the inner observable, since you only want to repeatedly subscribe to the same observable.

public Observable<A> getAs() {
   return Observable.fromCallable( () -> calcA() )
            .repeat()
            .takeWhile( v -> !condition( v );
}

getAs() will emit items until the condition has been reached. It will then complete.

public Observable<A> pollAs(){
   return getAs()
            .repeatWhen( completed -> completed.delay(1000, TimeUnit.MILLISECONDS) );

pollAs() will continually resubscribe to the getAs() observable, pausing for a second between each subscription.

Edit: I have uploaded a 6-month-duration example to https://pastebin.com/kSmi24GF It shows that you have to keep advancing the time for data to come out.

Upvotes: 2

godzsa
godzsa

Reputation: 2395

I came up with this solution:

public Observable<A> pollAs() {
   return Observable.create(subscriber -> {
       do {
           getAs().subscribe(
                   { subscriber.onNext(it) },
                   { subscriber.onError(it) },
                   { Thread.sleep(1000) }
           )
       } while (true)
   })
}

I don't really like this one can someone show me a more convenient way?

Upvotes: 0

Related Questions