Ced
Ced

Reputation: 17417

Change the subscription state of an observable with another observable

If you run the code below you'll see in the console that an xhr request is sent regardless of whether or not I'm subscribed to subject. I'd like to not make those request when I'm not subbed to it.

// npm install rxjs
const Rx = require('rxjs/Rx');

let subject = new Rx.BehaviorSubject(null)
Rx.Observable.timer(0, 1000).subscribe(i => someApiCall(i));
//at this point xhr request will be sent every second

function someApiCall(i){
    // retrieve some data
    console.log("xhr request sent")
    subject.next(i);
}

//so here we are gonna subscribe to the subject, xhr made prior 
//to this subscription are useless
let subscription;
setTimeout(() => subscription = subject.subscribe(i => console.log(i)),2500);
setTimeout(() => subscription.unsubscribe(),6000);
// now we are unsubscribing but the xhr req keep going

The reason I'm using the behaviorSubject and not subscribing directly on the observable is because I want the last value from the last xhr request instantly when I resubscribe.

Upvotes: 2

Views: 113

Answers (2)

Ashley
Ashley

Reputation: 568

https://acutmore.jsbin.com/bepiho/2/edit?js,console

const { Observable } = Rx;

function someApiCall(i){
  return Observable.create(observer => {
    console.log("xhr request sent")
    observer.next(i);
    observer.complete();
  });
}

const data = Rx.Observable.timer(0, 1000)
  // map each value to the values of someApiCall
  . mergeMap(i => someApiCall(i))
  // share the values through a replaySubject
  .publishReplay(1)
  // Only connect to the source when there is at least one subscriber
  .refCount();

data
  .take(5)
  .subscribe(v => console.log(v));

data
  .take(1)
  .subscribe(v => console.log(v));

Worth noting this won't work as expected if everyone does .take(1) when they subscribe because everyone will get the value in the ReplaySubject and then instantly unsubscribe before a new xhrRequest will be made.

i.e. Something needs to keep it alive long enough for the interval to keep firing.

Upvotes: 1

Mark van Straten
Mark van Straten

Reputation: 9425

You should use .ShareReplay(1) instead of using a BehaviourSubject. That way you can keep it lazy and cache the last value of your xhr call.

const source = Rx.Observable.interval(1000)
  .mergeMap(I => doXhr())
  .shareReplay(1);

source.subscribe(console.log)

Upvotes: 1

Related Questions