shados
shados

Reputation: 137

Making a lazy, cached observable that only execute the source once

I'm trying to use an rxjs observable to delegate, but share, a piece of expensive work across the lifetime of an application.

Essentially, something like:

var work$ = Observable.create((o) => {
  const expensive = doSomethingExpensive();
  o.next(expensive);
  observer.complete();
})
.publishReplay(1)
.refCount();

Now, this works fine and does exactly what I want, except for one thing: if all subscribers unsubscribe, then when the next one subscribes, my expensive work happens again. I want to keep it.

now, I could use a subject, or I could remove the refCount() and use connect manually (and never disconnect). But that would make the expensive work happen the moment I connect, not the first time a subscriber tries to consume work$.

Essentially, I want something akin to refCount that only looks at the first subscription to connect, and never disconnect. A "lazy connect".

Is such a thing possible at all?

Upvotes: 3

Views: 1458

Answers (1)

Mark van Straten
Mark van Straten

Reputation: 9425

How does publishReplay() actually work

It internally creates a ReplaySubject and makes it multicast compatible. The minimal replay value of ReplaySubject is 1 emission. This results in the following:

  • First subscription will trigger the publishReplay(1) to internally subscribe to the source stream and pipe all emissions through the ReplaySubject, effectively caching the last n(=1) emissions
  • If a second subscription is started while the source is still active the multicast() will connect us to the same replaySubject and we will receive all next emissions until the source stream completes.
  • If a subscription is started after the source is already completed the replaySubject has cached the last n emissions and it will only receive those before completing.

const source = Rx.Observable.from([1,2])
  .mergeMap(i => Rx.Observable.of('emission:'+i).delay(i * 100))
  .do(null,null,() => console.log('source stream completed'))
  .publishReplay(1)
  .refCount();

// two subscriptions which are both in time before the stream completes
source.subscribe(val => console.log(`sub1:${val}`), null, () => console.log('sub1 completed'));
source.subscribe(val => console.log(`sub2:${val}`), null, () => console.log('sub2 completed'));

// new subscription after the stream has completed already
setTimeout(() => {
  source.subscribe(val => console.log(`sub_late-to-the-party:${val}`), null, () => console.log('sub_late-to-the-party completed'));
}, 500);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.3/Rx.js"></script>

Upvotes: 4

Related Questions