Reputation: 710
In a component, I subscribe to two services. the first one,
this.myFirstService.currentData.subscribe()
where currentData is from a BehaviorSubject
private currentDataSource = new BehaviorSubject<any>([]);
currentData = this.currentDataSource.asObservable();
The second one is from an http.get
getData(): Observable<Data> {
if(localStorage.getItem('token'))
var headers = new HttpHeaders({'Authorization':'Bearer ' + localStorage.getItem('token')});
else
var headers = new HttpHeaders({'Authorization':'Anonymous '});
return this.http.get<Data>("http://someurl/",{
headers:headers,
responseType: 'json'
})
.pipe();
}
I want to merge the datas received from both subscription. How can I archived that?
Upvotes: 11
Views: 14695
Reputation: 5719
Since you are using RxJs, there are a couple of ways you can do this:
Concat will combine two observables into a combined sequence, but the second observable will not start emitting until the first one has completed.
let first = Observable.timer(10,500).map(r => {
return {source:1,value:r};
}).take(4);
let second = Observable.timer(10,500).map(r => {
return {source:2,value:r};
}).take(4);
first.concat(second).subscribe(res => this.concatStream.push(res));
This will merge the two but you will receive the first observable result before the second:
0 1 2 3 0 1 2 3
Merge is similar to concat, but it will interleave the emitted values instead of completing the first observable before starting the second one.
let first = Observable.timer(10,500).map(r => {
return {source:1,value:r};
}).take(4);
let second = Observable.timer(10,500).map(r => {
return {source:2,value:r};
}).take(4);
first.merge(second).subscribe(res => this.mergeStream.push(res));
You will get:
0 0 1 1 2 2 3 3
We use forkJoin to execute observables in parallel. One common use case of this is making multiple http requests in parallel. In my sample I am forkJoining two very simple observables, but the key point is that the subscriber won't receive any values until both observables have completed.
let first = Observable.of({source:1,value:1});
let second = Observable.of({source:2,value:1});
Observable.forkJoin(first,second)
.subscribe((res:Array) => this.forkJoinStream = res);
This would be similar to $q.all() from Angular 1.x.
And finally
Use this if you have dependency between your observables. Lets say you have to get the user before you can get user details with two different http requests:
let first = Observable.of(10);
first.flatMap((operand1) => {
return Observable.of(operand1 + 10);
})
.subscribe(res => this.flatMappedStreams = {msg: '10 + 10 = ' + res});
You will get:
10 + 10 = 20
Upvotes: 19
Reputation: 10157
What do you mean to "merge" the data?
There are several ways of combining observable streams, what you'll probably need is combineLatest
operator.
currentData = this.currentDataSource.asObservable();
httpData = http.getData();
currentData.pipe(combineLatest(httpData))
.subscribe(([current, http]) => console.log(current, http));
Upvotes: 0
Reputation: 41445
You can use the forkJoin
to merge the observables.
currentData = this.currentDataSource.asObservable();
httpReq = this.getData();
forkJoin(currentData, httpReq ).subscribe((result) => {
// combine results
})
Upvotes: 1