Michael B
Michael B

Reputation: 343

rxjs - observable emits nothing (not completing?)

I have an angular 6 app and I originally set up my backend with a rest api, but i'm starting to convert parts to use socket.io.

When I return the data from my rest api the following works:

this.http.get(api_url + '/versions/entity/' + entityId).pipe(
  mergeMap((versions:IVersion[]) => versions),
  groupBy((version:IVersion) => version.type),
  mergeMap(group => group.pipe(
    toArray(),
    map(versions=> {
      return {
        type: group.key,
        versions: versions
      }
    }),
    toArray()
  )),
  reduce((acc, v) => acc.concat(v), [])
);

The express route:

router.get('/entity/:entityId', (req, res) => {
  const entityId = req.params.entityId;

  Version.getVersionsByEntity(entityId, (err, versions) => {
    if (err) {
      res.json({success: false, msg: err});
    } else {
      res.json(versions);
    }
  });
});

Which gets the data from my mongo database with mongoose:

export function getVersionsByEntity(entityId, callback) {
  console.log('models/version - get versions by entity');

  Version.find({'entity.entityId': entityId})
          .exec(callback);
}

However when I make the exact same call but with socket.io the observable doesn't return anything. I'm guessing its because it never completes? Does an http call send a 'complete' message when the data is successfully transferred?

The sockets are sent from this service:

getVersionsByEntity(entityId): Observable<IVersion[]> {
    // create observable to list to refreshJobs message
    let observable = new Observable(observer => {
      this._socketService.socket.on('versionsByEntity', (data) => {
        observer.next(data);
      });
    });

    this._socketService.event('versionsByEntity', entityId);

    return <Observable<IVersion[]>> observable;
  }

Which calls the same mongoose function from the server.

The observable returned from (socket) service does actually contain data, it's only once I add the toArray() functions that it never prints anything when I subscribe..

Could someone help me fix this and also explain the theory behind this? Is it not completing? Does http send a 'completed' message with Angular?

EDIT: I've created a simple stackblitz which is doing what I want but I want to remove the take(1) from the _dataService since I may want to update the data so I want to keep the observable open - https://stackblitz.com/edit/rxjs-toarray-problem

EDIT 2: I'm close by replacing toArray with the scan operator but it seems to be emitting twice for the array. reduce() emits the correct data but seems to only emit on complete (like toArray) so its no better - https://stackblitz.com/edit/rxjs-toarray-problem-tpeguu

Upvotes: 0

Views: 2006

Answers (2)

Picci
Picci

Reputation: 17762

In addition to what @m1ch4ls said, you have to consider how toArray works. toArray transforms all data notified by an Observable into an array of such data. In order for this to work, the Observable has to complete.

An Observable returned by the Angular http client completes always after the first notification, and therefore toArray works. A socket.io stream completes when it is closed, so using toArray on such a stream may have the effect of never getting any value out of the Observable in case the stream is not closed.

On the other hand, if you want to close the stream after one notification, which is what happens if you use take(1) then you better consider stay with http requests. Socket streams are thought to be sort of long living channels so it does not fit their nature if you have to close them always after one message has been transmitted.

UPDATED VERSION after comment

Here is the code that works without take

 getData() {
    return this._dataService.getVersions().pipe(
      map(
        (versions: Array<any>) => versions.reduce(
          (acc, val) => {
            const versionGroup = (acc.find(el => el.type === val.type));
            if (versionGroup) {
              versionGroup.versions.push(val)
            } else {
              acc.push({type: val.type, versions: [val]})
            }
            return acc;
          }, []
        )
      ),
    )

The key point to understand is that your service is returning you an Array of things. In order to achieve your result you can work directly on that Array with Array methods and you do not need to use Observable operators. Please do not consider the code of the grouping logic I have used above as the right implementation - a real implementation probably would benefit in using things like lodash for grouping, but I did not want to over complicate things.

This is you original code

getData() {
    return this._dataService.getVersions().pipe(
      mergeMap((versions: Array<any>) => versions),
      groupBy((version:IVersion) => version.type),
      mergeMap(group => group.pipe(
        toArray(),
        map(versions=> {
          return {
            type: group.key,
            versions: versions
          }
        }),
        toArray()
      )),
      reduce((acc, v) => acc.concat(v), [])
    )
  }

What this does is

  1. you create a stream of objects applying the first mergeMap to the Array returned by the service
  2. via groupBy operator you create a new Observable which emits the

  3. Objects grouped according to your logic then you enter a second more complex mergeMap which takes the the arrays emitted by groupBy, turn them into a stream of Objects, just to immediately convert them to an Array again via the first toArray, that gets then transformed to the Object of type {type: string, versions: []}, to then eventually invoke again toArray to create an Array of Array

  4. The last thing you do is to run reduce to create your final Array

Why it works only with take? Because groupBy gets executed only when its source Observable completes, which makes sense you you want to group a finite set of things. Similarly does reduce Observable operator.

take is a way to complete the source Observable.

Upvotes: 1

m1ch4ls
m1ch4ls

Reputation: 3435

Handling event streams is a little bit different. I have tried to propose some code bellow:

getVersionsByEntity(entityId): Observable<IVersion[]> {
    return defer(() => {
        this._socketService.event('versionsByEntity', entityId);

        return fromEvent(this._socketService.socket, 'versionsByEntity')
            .pipe(take(1))
    }) as Observable<IVersion[]>;
}

The main idea is to wrap everything in defer which will make the Observable lazy and only call socketService.event when it's subscribed (your original implementation is eager by calling socketService.event right away). Having implementation eager can have unintended consequences - it's easy to miss event if the Observable is subscribed too late.

I also suggest using fromEvent Observable factory - which handles event listener setup and tear down.

Finally to complete the Observable after first emission I've added take(1) - this will limit the number of emissions to 1 and unsubscribe the Observable.

Upvotes: 1

Related Questions