Kiran
Kiran

Reputation: 421

RxJS: Observable stream piped to groupBy() followed by concatMap(); data for subsequent keys lost

I am trying to use the RxJS groupBy operator followed by concatMap to collect records into individual groups based on some keys.

I have noticed that when concatMap follows a groupBy operator, it seems to lose the data for all the keys that occur after the first one.

For Example:

Consider the following code block:

// DOES NOT WORK
const records = ['a:1', 'b:2', 'c:3', 'd:1', 'e:2', 'f:3', 'g:1'];
const clicks = new Subject();

const result = clicks.pipe(
  groupBy(x => x.substr(2,1)),
  concatMap(ev$ => ev$.pipe(map(x => ({key: ev$.key, value: x})))),
);
const subscription = result.subscribe(x => console.log(x));

records.forEach(x => clicks.next(x));

// Expected Output:
// { key: '1', value: 'a:1' }
// { key: '1', value: 'd:1' }
// { key: '1', value: 'g:1' }
// { key: '2', value: 'b:2' }
// { key: '2', value: 'e:2' }
// { key: '3', value: 'c:3' }
// { key: '3', value: 'f:3' }
// 
// Actual Output:
// { key: '1', value: 'a:1' }
// { key: '1', value: 'd:1' }
// { key: '1', value: 'g:1' }
// ...Nothing more -- no results for key 2 and 3

However, when I use the concatMap operator on its own, it behaves as expected.

// WORKS
const records = ['a', 'b', 'c', 'd', 'e', 'f', 'g'];
const clicks = new Subject();

const result = clicks.pipe(
  concatMap(ev => ev.subject$.pipe(take(4), map(x => ev.key + x))),
);
const subscription = result.subscribe(x => console.log(x));

records.forEach(x => clicks.next({key: x, subject$: interval(1000)}));

// Expected & Actual Output:
// a0
// a1
// a2
// a3
// b0
// b1
// b2
// b3
// c0
// c1
// c2
// c3
// d0
// d1
// d2
// d3
// e0
// e1
// e2
// e3
// f0
// f1
// f2
// f3
// g0
// g1
// g2
// g3

Reading through the documentation for RxJS groupBy and concatMap does not provide me with any clues as to what could be going on here. Whereas the section on RxJS concatMap at reactivex.io leads me to believe that this should work.

Can anyone help me understand what's going on with the first scenario here? How can I get the first scenario to work?

Upvotes: 6

Views: 1457

Answers (2)

Meirion Hughes
Meirion Hughes

Reputation: 26398

My answer is to supplement Kiran's and to note that you'll get exactly the same problem as described in the question if you employ an async mergeMap.

when you use groupBy, as Kiren explains, it internally creates a Subject that immediately subscribes to the source. The following works...

source.pipe(
  groupBy(item => item.id),
  mergeMap(byId => {
    return byId.pipe(map(x=>service.put(x)));
  }),

... because (from what I can gather) the subscriptions are synchronous - mergeMap subscribes to each new grouping immediately (assuming no concurrency limits) and so it catches the data.

If you want to do something asynchronously, per grouping, you might try...

source.pipe(
  groupBy(item => item.id),
  mergeMap(async byId => {
    let service = await this.getSomething(byId.key); 
    return byId.pipe(map(x=>service.put(x)));
  }),
  mergeAll()

... at which point the subscription to the grouping Observable is deferred until mergeAll and it will miss the initial data.

The solution is exactly as Kiran says: you must use a buffering subject so that the values can be replayed when the group is finally subscribed too: groupBy(item => item.id, null, null,()=>new ReplaySubject()) will work perfectly fine.

My personal solution, born of not wanting any buffering after that initial subscription, was to make a custom BufferSubject that buffers only until that first subscription and then simply passes though next to the base Subject.

/** buffers items until the first subscription, then replays them and stops buffering */
export class BufferSubject<T> extends Subject<T>{
  private _events: T[] = [];

  constructor(private scheduler?: SchedulerLike) {
    super();
  }

  next(value: T) {
    this._events.push(value);    
    super.next(value);
  }

  _subscribe(subscriber: Subscriber<T>): Subscription {
    const _events =  this._events;

    //stop buffering
    this.next = super.next;
    this._events = null;    

    const scheduler = this.scheduler;
    const len = _events.length;
    let subscription: Subscription;

    if (this.closed) {
      throw new ObjectUnsubscribedError();
    } else if (this.isStopped || this.hasError) {
      subscription = Subscription.EMPTY;
    } else {
      this.observers.push(subscriber);
      subscription = new SubjectSubscription(this, subscriber);
    }

    if (scheduler) {
      subscriber.add(subscriber = new ObserveOnSubscriber<T>(subscriber, scheduler));
    }

    for (let i = 0; i < len && !subscriber.closed; i++) {
      subscriber.next(_events[i]);
    }

    if (this.hasError) {
      subscriber.error(this.thrownError);
    } else if (this.isStopped) {
      subscriber.complete();
    }

    return subscription;
  }
}

/** from rxjs internals */
export class SubjectSubscription<T> extends Subscription {
  closed: boolean = false;

  constructor(public subject: Subject<T>, public subscriber: Observer<T>) {
    super();
  }

  unsubscribe() {
    if (this.closed) {
      return;
    }

    this.closed = true;

    const subject = this.subject;
    const observers = subject.observers;

    this.subject = null;

    if (!observers || observers.length === 0 || subject.isStopped || subject.closed) {
      return;
    }

    const subscriberIndex = observers.indexOf(this.subscriber);

    if (subscriberIndex !== -1) {
      observers.splice(subscriberIndex, 1);
    }
  }
}

and used instead of the replay:

groupBy(item => item.id, null, null,()=>new BufferSubject())

Upvotes: 0

Kiran
Kiran

Reputation: 421

I finally seem to have figured out what the issue is here.

In Scenario #1 in the question above, the code pipes the source stream first into the groupBy operator, followed by the concatMap operator. And this combination of operators seem to be causing this problem.

Inner workings of groupBy and mergeMap

Reading through the code for the groupBy operator, I realized that groupBy internally creates a new Subject instance for each key that is found in the source stream. All values belonging to that key are then immediately emitted by that Subject instance.

All the Subject instances are wrapped into GroupedObservales and emitted downstream by the groupBy operator. This stream of GroupedObservable instances is the input to the concatMap operator.

The concatMap operator internally calls the mergeMap operator with a value of 1 for concurrency, which means only one source observable is subscribed to concurrently.

The mergeMap operator subscribes to only one observable, or as many observables as is allowed by the conccurency parameter, and holds all other observables in "buffer" till the first one has completed.

How does this create the problem?

Firstly, now that I have read through the code for these operators, I am not too sure if this is a "problem".

Nevertheless the behavior I described in the question occurs because while the groupBy operator emits individual values using the corresponding Subject instance immediately, the mergeMap operator would not have subscribed to that particular Subject. Hence, all values from the source stream that are being emitted using that Subject are lost.

I have tried to illustrate this problem in a rough marble diagram: groupBy to concatMap issue

This is not a "problem" with the way these operators work, but perhaps with the way I understood these operators and possibly the documentation (particularly the documentation for concatMap which maybe a bit confusing for the folks new to RxJS).

This can be easily fixed by getting the groupBy operator to use a ReplaySubject instead of Subject to emit the grouped values. The groupBy accepts a subjectSelector parameter that allows us to switch the Subject instance with a ReplaySubject instance.

The following code works:

// THIS VERSION WORKS
const records = ['a:1', 'b:2', 'c:3', 'd:1', 'e:2', 'f:3', 'g:1'];
const clicks = new Subject();

const result = clicks.pipe(
  groupBy(x => x.substr(2,1), null, null, () => new ReplaySubject()),
  concatMap(ev$ => ev$.pipe(map(x => ({key: ev$.key, value: x})))),
);
const subscription = result.subscribe(x => console.log(x));

records.forEach(x => clicks.next(x));

// We also need to explicity complete() the source
// stream to ensure that the observable stream for
// the first GroupedObservable completes allowing
// the concatMap operator to move to the second
// GroupedObservable.
clicks.complete();

// Expected and Actual output
// { key: '1', value: 'a:1' }
// { key: '1', value: 'd:1' }
// { key: '1', value: 'g:1' }
// { key: '2', value: 'b:2' }
// { key: '2', value: 'e:2' }
// { key: '3', value: 'c:3' }
// { key: '3', value: 'f:3' }

Why does Scenario 2 work?

Scenario 2 in my question works fine because interval just creates an Observable but does not start emitting values. Hence, all values from that Observable are available when mergeMap finally subscribes to it.

Upvotes: 9

Related Questions