Reputation: 2395
What I would like to do is create a function which runs another function every second. The second function returns Observables<A>
and I want the first function to return Observables<A>
as well instead of Observable<Observable<A>>
for example:
private A calcA(){
...
return new A(...)
}
public Observable<A> getAs(){
return Observable.create( subscriber -> {
Bool condition = ...
do {
subscriber.onNext(calcA())
} while (condition)
subscriber.onComplete()
})
}
public Observable<A> pollAs(){
return Observable.create(subscriber -> {
do {
subscriber.onNext(getAs()) // Flatten here I guess
Thread.sleep(1000)
} while(true)
})
So I would like to do something similar (I tried to write this in a Java-ish way, but I will use Kotlin)
Upvotes: 0
Views: 611
Reputation: 8227
You don't need to use the flatMap()
operator to flatten the inner observable, since you only want to repeatedly subscribe to the same observable.
public Observable<A> getAs() {
return Observable.fromCallable( () -> calcA() )
.repeat()
.takeWhile( v -> !condition( v );
}
getAs()
will emit items until the condition has been reached. It will then complete.
public Observable<A> pollAs(){
return getAs()
.repeatWhen( completed -> completed.delay(1000, TimeUnit.MILLISECONDS) );
pollAs()
will continually resubscribe to the getAs()
observable, pausing for a second between each subscription.
Edit: I have uploaded a 6-month-duration example to https://pastebin.com/kSmi24GF It shows that you have to keep advancing the time for data to come out.
Upvotes: 2
Reputation: 2395
I came up with this solution:
public Observable<A> pollAs() {
return Observable.create(subscriber -> {
do {
getAs().subscribe(
{ subscriber.onNext(it) },
{ subscriber.onError(it) },
{ Thread.sleep(1000) }
)
} while (true)
})
}
I don't really like this one can someone show me a more convenient way?
Upvotes: 0