Reputation: 2820
I have the following two subscriptions:
this.service1.source1.subscribe(countries => {
this.filteredData = [];
countries.forEach(country => {
this.filteredData = this.data.filter(user => {
return user.country == country;
});
});
});
this.service2.source2.subscribe(companies => {
companies.forEach(company => {
this.filteredData = [];
this.filteredData = this.data.filter(user => {
return user.company == company;
});
});
});
where source1 and source2 are BehaviorSubject
and both events are being caught by subscribe
.
I want to combine both subscriptions because I want to filter data depending on both results returned by subscribe
I tried forkJoin
and zip
zip([this.service1.source1, this.service1.source2]).subscribe(results => {
console.log('zip results:', results);
});
forkJoin([this.service1.source1, this.service1.source2]).subscribe(results => {
console.log('forkJoin results:', results);
});
But with those (zip & forkJoin) I noticed I am not even getting anything on the console, as if subscribe
is not getting executed when events are emitted.
How can I combine the results of both sources into one subscription?
Upvotes: 2
Views: 13852
Reputation: 4874
forkJoin
may not be suitable for your use case because it emits the last emitted value from each, when all observables COMPLETE.
zip
also may not give you the desired behavior because It waits for all input streams to have all emitted their n-th value and as soon as this condition is met, it combines all those n-th values and emits the n-th combined value.
so in either case you won't get an emission until there is an emission in both observables. since this.service1.source1
andthis.service1.source2
are BehaviorSubject using zip
guarantees an initial emission. but later emissions will occur only when both observables emit.
i suggest to use combineLatest beacuse whenever any input stream emits a value, it combines the latest values emitted by each input stream.
combineLatest(this.service1.source1, this.service1.source2).subscribe(results => {
console.log('combineLatest results:', results);
});
and since this.service1.source1
andthis.service1.source2
are BehaviorSubject such that:
A variant of Subject that requires an initial value and emits its current value whenever it is subscribed to.
it is guaranteed that you get an emmission whenever you subscribe to it and whenever any of the observables emit a value.
Upvotes: 5
Reputation: 39482
Syntax is wrong. Both forkJoin
and zip
accept Observables as arguments. NOT an array of Observables. Remove []
from the call and you should be fine.
Try this:
zip(this.service1.source1, this.service1.source2).subscribe(results => {
console.log('zip results:', results);
});
forkJoin(this.service1.source1, this.service1.source2).subscribe(results => {
console.log('forkJoin results:', results);
});
Upvotes: 2
Reputation: 1889
As @ysf pointed out above, if you want to filter the local data with both results upon either of emission, you should go for combineLatest
.
Additionally, I’d recommend you to create a variable in each service as an Observable instance for the BehaviorSubject
. Like:
// at Service1.service.ts
export class Service1Service {
// Do not expose Subject directly.
private source1 = new BehaviorSubject<YourDataType[]>(YourInitialValue);
private source2 = new BehaviorSubject<YourDataType[]>(YourInitialValue);
source1$: Observable<YourDataType[]>;
source2$: Observable<YourDataType[]>;
constructor() {
// Create an Observable instance for your sake and open it to the public.
this.source1$ = this.source1.asObservable();
this.source2$ = this.source2.asObservable();
}
}
// at wherever you subscribe
combineLatest(this.service1.source1$, this.service1.source2$).pipe(
filter(([res1, res2]: YourDataType[]) => {
// Filter your data with res1, res2
})
).subscribe();
Upvotes: 2