lil-works
lil-works

Reputation: 710

angular waiting several subscribe to do something

In a component, I subscribe to two services. the first one,

this.myFirstService.currentData.subscribe()

where currentData is from a BehaviorSubject

  private currentDataSource = new BehaviorSubject<any>([]);
  currentData = this.currentDataSource.asObservable();

The second one is from an http.get

  getData(): Observable<Data> {
  if(localStorage.getItem('token'))
    var headers = new HttpHeaders({'Authorization':'Bearer ' +   localStorage.getItem('token')});
  else
    var headers = new HttpHeaders({'Authorization':'Anonymous '});

  return this.http.get<Data>("http://someurl/",{
    headers:headers,
    responseType: 'json'
  })
    .pipe();
}

I want to merge the datas received from both subscription. How can I archived that?

Upvotes: 11

Views: 14695

Answers (3)

Yaser
Yaser

Reputation: 5719

Since you are using RxJs, there are a couple of ways you can do this:

Concat

Concat will combine two observables into a combined sequence, but the second observable will not start emitting until the first one has completed.

let first = Observable.timer(10,500).map(r => {
  return {source:1,value:r};
}).take(4);
let second = Observable.timer(10,500).map(r => {
  return {source:2,value:r};
}).take(4);
first.concat(second).subscribe(res => this.concatStream.push(res));

This will merge the two but you will receive the first observable result before the second:

0 1 2 3 0 1 2 3

Merge

Merge is similar to concat, but it will interleave the emitted values instead of completing the first observable before starting the second one.

let first = Observable.timer(10,500).map(r => {
  return {source:1,value:r};
}).take(4);
let second = Observable.timer(10,500).map(r => {
  return {source:2,value:r};
}).take(4);
first.merge(second).subscribe(res => this.mergeStream.push(res));

You will get:

0 0 1 1 2 2 3 3 

forkJoin

We use forkJoin to execute observables in parallel. One common use case of this is making multiple http requests in parallel. In my sample I am forkJoining two very simple observables, but the key point is that the subscriber won't receive any values until both observables have completed.

let first = Observable.of({source:1,value:1});
let second = Observable.of({source:2,value:1});
Observable.forkJoin(first,second)
            .subscribe((res:Array) => this.forkJoinStream = res);

This would be similar to $q.all() from Angular 1.x.

And finally

flatMap

Use this if you have dependency between your observables. Lets say you have to get the user before you can get user details with two different http requests:

let first = Observable.of(10);
first.flatMap((operand1) => {
  return Observable.of(operand1 + 10);
})
.subscribe(res => this.flatMappedStreams = {msg: '10 + 10 = ' + res});

You will get:

10 + 10 = 20

Upvotes: 19

Roberto Zvjerković
Roberto Zvjerković

Reputation: 10157

What do you mean to "merge" the data?

There are several ways of combining observable streams, what you'll probably need is combineLatest operator.

currentData = this.currentDataSource.asObservable();
httpData = http.getData();
currentData.pipe(combineLatest(httpData))
    .subscribe(([current, http]) => console.log(current, http)); 

Upvotes: 0

Sachila Ranawaka
Sachila Ranawaka

Reputation: 41445

You can use the forkJoin to merge the observables.

  currentData = this.currentDataSource.asObservable();
  httpReq = this.getData();

  forkJoin(currentData, httpReq ).subscribe((result) => {
    // combine results
  })

Upvotes: 1

Related Questions