Reputation: 9289
I would like to create an Observable, which fires if both ReplaySubject instances are holding a value. I tried it like this:
let v1 = new Rx.ReplaySubject();
let v2 = new Rx.ReplaySubject();
v1.next('X');
const combo = Rx.Observable.forkJoin(v1, v2)
combo.subscribe( arr => console.log(arr) )
v1.subscribe( s => console.log('v1', s) );
v2.subscribe( s => console.log('v2', s) );
v2.next('Y');
Both v1.subscribe
and v2.subscribe
is called, but not combo.subscribe
. What do I wrong? How is it possible to fix this issue?
Upvotes: 2
Views: 1355
Reputation: 7916
forkJoin
will only emit an array of all last values of the input observables once all of them complete. If you manually complete v1
and v2
, combo
will emit:
let v1 = new rxjs.ReplaySubject();
let v2 = new rxjs.ReplaySubject();
v1.next('X');
v2.next('Y');
v2.next('Z'); // This additional value should demonstrate the idea of forkJoin
const combo = rxjs.forkJoin(v1, v2)
combo.subscribe( arr => console.log(arr) )
v1.complete();
v2.complete();
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.2/rxjs.umd.js"></script>
You might be looking for combineLatest
instead if you don't want the inner observables to complete:
let v1 = new rxjs.ReplaySubject();
let v2 = new rxjs.ReplaySubject();
v1.next('X');
v2.next('Y');
const combo = rxjs.combineLatest(v1, v2)
combo.subscribe( arr => console.log(arr) )
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.2/rxjs.umd.js"></script>
Upvotes: 5
Reputation: 4641
forkJoin waits for input observables to complete, then emit last item emitted. You have to complete both subject.
Upvotes: 2