devnc
devnc

Reputation: 100

Subscribe inside a subscribe maintaining the order

Below coding has subscribe inside a subscribe. It gets a schedule_id and according to the schedule_id retrieve questions. This works perfectly but the order of getQuestion() is not guaranteed.

schedule_id#: 111, 222, 333, 444

getQuestion(schedule_id) output#: q1(222), q23(111), q12(444), q15(333)

But it should be#: q23(111), q1(222), q15(333), q12(444)

I need to execute getQuestion() according to the order of schedule_ids, wait until one process finishes before go to the next. Is it possible to use forkJoin() here?

const source = from(res.data);
source.pipe(map(({ schedule_id }) => schedule_id))
  .pipe(mergeMap(schedule_id => {
    let param = {
      "schedule_id": schedule_id
    }
    console.log("schedule_id#: ", schedule_id);
    return this.fetchApiService.getQuestion(param);
  }))
  .subscribe((result) => {
    console.log('result#:', result);
  })

Upvotes: 1

Views: 287

Answers (2)

Mrk Sef
Mrk Sef

Reputation: 8022

ConcatMap works just fine for me. Of course, with concatMap, you will never start the next observable until the previous one completes (that's how you maintain order). If you hand it an observable that never completes, you end up with a minor memory leak and nothing else.

If I take you code and make a lawful observable, it works for me.

  myConcatMapFunc(){
    let schedule_ids = from(['111','222','333','444']);
    schedule_ids.pipe(
      concatMap(data => {

        let param = {
          "schedule_id": data,
        }

        console.log('schedule_id#', data);

        return new Observable(subscriber => {
          subscriber.next('Q1');
          subscriber.next('Q2');
          subscriber.next('Q3');
          subscriber.complete();
          return { unsubscribe: () => {} };
        });

      })
    ).subscribe(result => {
      console.log('result#', result);
    })
  }

Upvotes: 1

Andrew Halil
Andrew Halil

Reputation: 1323

The operator concatMap() waits for the previous inner observable to complete before commencing the next observable, whereas mergeMap() processes multiple inner observables at the same time, with no guarantee of ordering.

Try this:

const source = from(res.data);
source.pipe(map(({ schedule_id }) => schedule_id))
  .pipe(concatMap(schedule_id => {
    let param = {
      "schedule_id": schedule_id
    }
    console.log("schedule_id#: ", schedule_id);
    return this.fetchApiService.getQuestion(param);
  }))
  .subscribe((result) => {
    console.log('result#:', result);
  })

Upvotes: 0

Related Questions