Reputation: 313
So, I'm currently learning to think Reactively thanks to RxJS. Now, I'm currently reading an RxJS book (Reactive Programming with RxJS) and I've read about AsyncSubject and how it only caches the last value received. What I'd like to know is, what happens if I want to update the server and have this observable update? Since onComplete has been called, do I need to create a brand new observer? Is there another pattern I should follow?
My overall requirement is that I'd like to have a means to cleanly pass data to and from the server and keep my observable (model) fresh at all times.
Thanks, Lee
Upvotes: 4
Views: 296
Reputation: 2796
As you suspected, you cannot change the value of an AsyncSubject after onCompleted has been called. The typical way of dealing with a simple "call the server as required, cache the value between calls" scenario is to use flatMapLatest to map your trigger observable to an AsyncSubject representing your server call. For example, if you wanted to refresh some data every 30 seconds, you might do something like this:
const subscription = Rx.Observable
.interval(30000)
.flatMapLatest(() => serverCall())
.subscribe(x => doStuffWithResult(x));
Upvotes: 2
Reputation: 8920
In your case I would consider using Websockets, since you want your observable (model) fresh at all times
Something like
var source = Rx.Observable.create(function (observer) {
websocket.onmessage = function( msg ) {
observer.onNext( msg );
}
websocket.onerror = function( error ) {
observer.onError( error );
}
websocket.onclose = function ( msg ) {
observer.onComplete( msg );
}
});
Otherwise you can use an interval
const subscription = Rx.Observable
.interval(1000)
.flatMapLatest(() => Rx.Observable.fromPromise(fetch( ... ).then( response => response.json() ).retry(5))
.subscribe( response => response );
You can always use the retry
operator to not give up immediately if there is an error.
Upvotes: 0