Reputation: 19353
I am trying to create a stream that polls a network service. At the moment it queries the service then completes after a short delay. I'd like the onward stream to restart rather than completing thereby polling the service forever.
You could do something like ...
myPollingStream.repeat()
But repeat
in RxSwift is actually repeatElement
and so actually generates a stream of observables. You could possibly concatMap
these into a flattened serial sequence but RxSwift does not have the concatMap
operator.
So how do I loop an observable in RxSwift?
I'd like the requests to be sequential, not concurrent so flatMap
is not an option since it merges streams leading to overlapping requests. I'm looking for something similar to how retry()
works but restarting onComplete
not onError
Upvotes: 6
Views: 6599
Reputation: 13651
Observable.repeatElement(myPollingStream, scheduler: MainScheduler.instance).concat()
repeatElement(_:scheduler:)
will create an infinite stream of polling queries.contat()
will then make sure each polling query is completed before subscribing to the next.While the above works in theory, without a backpressure implemetation, repeatElements(_:scheduler:)
will emit events until you eventually run out of memory. This makes this solution not viable as of RxSwift 3.0. More details can be found in this issue on RxSwift repository.
Upvotes: 8
Reputation: 2326
Option 1: Recursive function
Your myPollingStream
:
func myPollingStream() -> Observable<Result> {
return Observable<String>.create { observer in
// your network code here
return Disposables.create()
}
}
Then you create a a recursive function:
func callMyPollingStream() {
myPollingStream()
.subscribe(onNext: { result in
callMyPollingStream() // when onNext or onCompleted, call it again
})
.addDisposableTo(db)
}
Option 2: Use interval
let _ = Observable<Int>
.interval(5, scheduler: MainScheduler.instance)
.subscribe(onNext: { _ in
let _ = myPollingStream().subscribe()
})
.addDisposableTo(db)
With this option, myPollingStream() function will be called every 5 seconds.
Upvotes: 0