Andrew Lobban
Andrew Lobban

Reputation: 2137

Get RxJS Observable to return initial data immediately and then repeat every X seconds afterwards?

Below is a code sample that gives me results every 30 seconds, however, when a component subscribes to the service, it also has to wait 30 seconds for the initial set of data:

getTasks(query: string): Observable<any> {
  return this._http.get(this._ripcord + '/tasks' + query, this.getHttpOptions())
    .map((response: Response) => <any>response.json())
    .delay(30000)
    .repeat()
    .do(data => console.log(data))
    .catch(this.handleError);
}

I looked at a similar issue on stack but this solution doesn't work for me.

Does anyone know what operator I can use, in combination, to retrieve the initial data immediately on subscribe?

Upvotes: 0

Views: 1350

Answers (4)

Andrew Lobban
Andrew Lobban

Reputation: 2137

Thank you all for your responses as they helped me create my own solutions!

getTasks(query: string): Observable<any> {
  return Observable
    .timer(0, 30000)
    .switchMap(() =>
      this._http.get(this._ripcord + '/tasks' + query, this.getHttpOptions())
        .map((response: Response) => <any>response.json())
        .do(data => console.log(data))
        .catch(this.handleError)
    );
}

OR

getTasks(query: string): Observable<any> {
  return Observable
    .interval(30000)
    .startWith(0)
    .switchMap(() =>
      this._http.get(this._ripcord + '/tasks' + query, this.getHttpOptions())
        .map((response: Response) => <any>response.json())
        .do(data => console.log(data))
        .catch(this.handleError)
    );
}

OR

getTasks(query: string): Observable<any> {
  return Observable
    .timer(0, 30000)
    .switchMap(() =>
      this._http.get(this._ripcord + '/tasks' + query, this.getHttpOptions())
        .map((response: Response) => <any>response.json())

    )
    .do(data => console.log(data))
    .catch(this.handleError);
}

I realized I needed to utilize switchMap as map was returning an Observable in the subscription to the Http service GET response of the getTAsks endpoint. You can put the DO and the CATCH operators optionally on the switchMap instead of the map operator.

Upvotes: 1

JayChase
JayChase

Reputation: 11525

If you need to keep the observable sequence going when the http request returns an error catch the error when making the request and return an empty observable.

  getTasks(query: string): Observable<any> {
    return Observable
      .interval(30000)
      .startWith(0)
      .switchMap(event => {
        this._http
          .get(this._ripcord + '/tasks' + query, this.getHttpOptions())
          .catch(error => Observable.empty());
      })
      .map((response: Response) => <any>response.json())
      .do(data => console.log(data))
      .catch(this.handleError);
  }

Otherwise the sequence will stop on the first error.

Upvotes: 0

GreyBeardedGeek
GreyBeardedGeek

Reputation: 30088

interval.startWith (as referenced in the solution that you point to) is indeed the answer.

I suspect that what you really want is something like:

Observable
  .interval(30000)
  .startWith(0)
  .map(event =>
    this._http.get(this._ripcord + '/tasks' + query, this.getHttpOptions())
  )
  .do(data => console.log(data))
  .catch(this.handleError);

Upvotes: 0

nayakam
nayakam

Reputation: 4249

You could use timer. With timer you can specify when should emission start. Following sample emits first value immediately then it takes 5 secs to emit subsequent values.

var sourceTimer = Rx.Observable
        .timer(0, 5000)
        .timeInterval()
        .take(10);

var subscriptionTimer = sourceTimer.subscribe(
        function (x) {
            console.log('Next: ' + JSON.stringify(x));
        },
        function (err) {
            console.log('Error: ' + err);
        },
        function () {
            console.log('Completed');
        });

Output:

Next: {"value":0,"interval":4}
Next: {"value":1,"interval":5004}
Next: {"value":2,"interval":5000}

Interval & startWith also work, immediately emits the value zero which you provided as parameter to startWith then emits subsequent values in provided interval.

  var source = Rx.Observable
    .interval( 5000 /* ms */)
    .startWith(0)
    .timeInterval()
    .take(10);

 var subscription = source.subscribe(
    function (x) {
        console.log('Next: ' + JSON.stringify(x));
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });

Output:

Next: {"value":0,"interval":1}
Next: {"value":0,"interval":5003}
Next: {"value":1,"interval":5000}

Next: {"value":7,"interval":5000}
Next: {"value":8,"interval":5001}

Upvotes: 0

Related Questions