Sander Vanden Hautte
Sander Vanden Hautte

Reputation: 2543

How do I periodically crawl a REST API using RxJs?

I'm using RxJs's IntervalObservable to poll a REST API every second for new sensor data. The REST API responds with "buckets" of sensor measurements made during 10 seconds, so a response from the API may also contain a HTTP header 'next' that is pointing to a more recent bucket of sensor data, if that is available.

My current implementation (see below) has 2 problems:

Do you have some advice for these mixed observables?

export class WidgetService {
  private widget: Widget;
  private visualizer: any;
  private updateScheduler: Subscription;
  private timestampOfMostRecentObservation?: number;

  constructor(private sensorGateway: SensorGatewayCommunicationService) { }

  public initializeVisualization() {
    this.visualizer = new TimeSeriesLineChartWithTimeRangeSelector();
    this.visualizer.draw(`widget-${this.widget.id}-visualization`, this.widget.name, this.widget.seriesName);
    // First update of the visualization with sensor data since a timestamp in the past (121 seconds ago here):
    const initialFromTimestamp = Date.now() - 121 * 1000;
    this.updateVisualization(initialFromTimestamp);
    // Next updates of the visualization are scheduled every second:
    this.updateScheduler = IntervalObservable.create(1000)
      .subscribe(() => this.updateVisualization(this.timestampOfMostRecentObservation));
  }

  public destroy() {
    this.updateScheduler.unsubscribe();
    this.visualizer.destroy();
  }

  private updateVisualization(fromTimestamp: number) {
    const urlForNewObservations = this.widget.sensorMeasurementsUrl + `?from=${fromTimestamp.toString()}`;
    this.getSensorObservations(urlForNewObservations)
      .pipe(
        expand(({sensorObservations, nextUrl}) => { // https://ncjamieson.com/understanding-expand/
          if (sensorObservations && sensorObservations.length > 0 && nextUrl) {
            return this.getSensorObservations(nextUrl);
          } else {
            return empty();
          }
        }),
        concatMap(({sensorObservations}) => sensorObservations),
      )
      .subscribe((sensorObservations: [number, number][]) => {
        const downsampledObservations = this.downsampleSensorObservations(sensorObservations);
        this.visualizer.update(downsampledObservations);
      });
  }

  private getSensorObservations(urlForNewObservations: string): Observable<{
    sensorObservations: object[],
    nextUrl: string | null
  }> {
    return this.sensorGateway.getApiResource(urlForNewObservations).pipe(
      map(response => {
        if ('values' in response.body) {
          return {
            sensorObservations: response.body['values'].map(observation => [
              observation[0],
              observation[1]
            ]),
            nextUrl: this.getNextLinkUrl(response)
          };
        } else {
          return null;
        }
      })
    );
  }

  private getNextLinkUrl(response: HttpResponse<object>): string | null {
    if (response.headers.has('link')) {
      const linkHeader = response.headers.get('link');
      /* Example of a linkHeader:
       *'</sensors/1/properties/1/observations/20180711/12/19?from=1531311594456>; rel="self",
       * </sensors/1/properties/1/observations/20180711/12/18>; rel="prev",
       * </sensors/1/properties/1/observations/20180711/12/20>; rel="next"' */
      const links = linkHeader.split(',');
      const nextLink = links.find(link => link.endsWith('; rel="next"'));
      if (nextLink) {
        return nextLink.substring(nextLink.indexOf('<') + 1, nextLink.indexOf('>'));
      } else {
        return null;
      }
    }
  }
}

Upvotes: 1

Views: 396

Answers (1)

Mark Hughes
Mark Hughes

Reputation: 7374

Instead of using the subscription of one observable to trigger another, I would flip the problem on its head and create a single observable which does what you want.

I would propose something like this in your initialise method:

let fromTimestamp = Date.now() - 121 * 1000;
// Create a base observable, doesn't really matter what it is
this.subscription = of(true).pipe(
    // Map to the right call fur the current time
    flatMap(() => {
        const urlForNewObservations = this.widget.sensorMeasurementsUrl + `?from=${fromTimestamp.toString()}`;
        return this.getSensorObservations(urlForNewObservations);
    }),

    // Repeat the REST call while the sensor returns a next URL:
    expand(({sensorObservations, nextUrl}) => { // https://ncjamieson.com/understanding-expand/

      if (sensorObservations && sensorObservations.length > 0 && nextUrl) {
        // Set the fromTimestamp for the next set of observations.
        fromTimestamp = this.parseTimestamp(nextUrl, fromTimestamp);
        return this.getSensorObservations(nextUrl);
      } else {
        return empty();
      }
    }),
    concatMap(({sensorObservations}) => sensorObservations),

    // Keep repeating this
    repeat(),
    
    // But wait a second between each one
    delay(1000),        

    // Terminate the whole thing when the service is destroyed / stopped.              
    takeWhile(() => !this.destroyed)  
).subscribe((sensorObservations: [number, number][]) => {
    const downsampledObservations = this.downsampleSensorObservations(sensorObservations);
    this.visualizer.update(downsampledObservations);
});

You'll need to implement parseTimestamp to parse the relevant next timestamp from the URL or similar.

Then implement ngOnDestroy to set this.destroyed to true and do if (this.subscription) this.subscription.unsubscribe();, which will kill the subscription when the service is destroyed - and manually set it to true/unsubscribe in your destroy method if you want to manually control that also.

Upvotes: 1

Related Questions