Sn0opr
Sn0opr

Reputation: 1026

RxJS: combine multiple observables dynamically over time

My problem is recursive, and I don't know when my Observable complete() in a recursive function.

Basically, I have function is scrapping a website with a pagination functionality and it's returning an Observable that pushes the "parsed" items found on each page with the .next(data) method, and doing that recursively until we find ourselves on the last page and then we triggering the .complete() method of our Subject.

I'm using the concat(Observable) method to concatenate the new Observable with the current Observable recursively but it doesn't seem to work, when I subscribe to my observable I only get the items of the first page, which make me guess that the concat() method is not working for my case.

Here is a simplified version of my function code.

```

crawlList(url) {
    let obsSubject = new Subject();
    request(url, function (err, response, body) {
        //check if everything is alright...
        //parsing data...
        obsSubject.next(parsedData);
        //now we check if we can still paginate (we are not in the last page)
        //if so, we concat our observable with the new observable recursivly
        obsSubject.concat(crawList(url))
        //else ,(we are in the last page
        obsSubject.complete();
    });
    return obsSubject;
}

```

Upvotes: 0

Views: 931

Answers (1)

paulpdaniels
paulpdaniels

Reputation: 18663

In general avoid using Subjects unless you are certain you can't do it with operators.

In this case I would expect that expand would work here. Where the result of the previous stream is fed back into the operator so that it can be recursively executed.

Something like:

// Convert the callback into an Observable
const rxRequest = Rx.Observable.bindNodeCallback(
  request,
  (response, body) => ({response, body})
);    

// Feed the initial data set, by default we should continue
Observable.of({url: baseUrl, shouldContinue: true})
  .expand(({url, shouldContinue}) => {
    // Return an empty stream to cancel the Observable
    // note this is from the *previous* iteration
    if (!shouldContinue)
      return Rx.Observable.empty();

    // This is how we call the newly created method
    return rxRequest(url)
      .map(({response, body}) => 
        // Parse data
        // Check if you should continue or not
        // We still need to emit this data so we can't cancel until the next
        // go-around
        ({url: newUrl, data: parsedData, shouldContinue})
      );
  })
  // Downstream only cares about the data part
  .pluck('data')

Upvotes: 3

Related Questions