MrYutz
MrYutz

Reputation: 438

RXJS Observables - Run a repeating function every 500ms and output results to an observable stream

First off - I'm new to Observables. Coming from Python.

I am trying to acheive:

The code below is functional - except it doesn't repeat. I can't figure out timer, delay, or interval with RXJS 6.


// In this example the chart has 3 horizontal lines at 5,10 and 17 "prices"
// Desired output is 5 -- 10 -- 17
// When 4th line is added at 20 "price" expected output would be
// 5 -- 10 -- 17 -- 20


const test$ = new Observable<number>(observer => {
      chart
      .activeChart()
      .getAllShapes()
      .forEach((shape) => {
        if (shape.name == 'horizontal_line') {
          chart
            .activeChart()
            .getShapeById(shape.id)
            .getPoints()
            .forEach((point) => {
              observer.next(+point.price.toFixed(4));
            });
        }
      })

    }).pipe(
      delay(500),
      repeat()

Upvotes: 1

Views: 1894

Answers (4)

Barremian
Barremian

Reputation: 31105

I'm not sure what you've tried so far with RxJS timer and interval. You could use timer and map to the required data using the map operator. You could then use Array#map with Array#filter to transform the data as per the requirement.

timer(0, 500).pipe(   // <-- emit immediately and after every 500ms thereafter
  map(_ => 
    chart
      .activeChart()
      .getAllShapes()
      .map(shape => {
        if (shape.name === 'horizontal_line')
          return shape.getPoints()
      })
      .filter((points) => !!points)  // <-- filter `undefined` from previous step
  )
).subscribe({
  next: points => {
    /* output:
      [
        [ <collection of points from line 5> ],
        [ <collection of points from line 10> ],
        [ <collection of points from line 17> ],
        ...
      ]
    */
  }
});

If for whatever reason you wish to combines all the points in a single array and emit them continuously to the subscription, you'd need to use swithcMap operator with RxJS from function.

timer(0, 500).pipe(
  map(_ => 
    chart
      .activeChart()
      .getAllShapes()
      .map(shape => {
        if (shape.name === 'horizontal_line')
          return shape.getPoints()
      })
      .filter((points) => !!points)  // <-- filter `undefined` from previous step
  ),
  switchMap(pointsColl => {
    return from(pointsColl.flat())
  })
).subscribe({
  next: points => {
    /* output:
      point 1 from line 5
      point 2 from line 5
      ...
    */
  }
});

If you instead need to emit the collection of all the points as a single array, you'd need to use of function instead of from.

// <repeat from above>
  switchMap(pointsColl => {
    return of(pointsColl.flat())
  })
).subscribe({
  next: points => {
    /* output:
      [ <collection of points from line 5, 10 and 17> ]
    */
  }
});

Upvotes: 1

BizzyBob
BizzyBob

Reputation: 14740

Run a repeating function every 500ms and output results to an observable stream

I think the simplest way to do this it to use interval (or timer if you need to emit immediately) to create a stream of emissions every 500ms, then simply map the emission to your function's return value:

test$ = interval(500).pipe(
    map(() => theFunction())
);

Here is a working StackBlitz of this simple example. Obviously, you would make the function do whatever you need it to do :-)

Upvotes: 1

Steve Holgado
Steve Holgado

Reputation: 12071

The repeat will not work unless your observable completes:

const test$ = new Observable<number>(observer => {
  chart
    .activeChart()
    .getAllShapes()
    .forEach((shape) => {
      if (shape.name == 'horizontal_line') {
        chart
          .activeChart()
          .getShapeById(shape.id)
          .getPoints()
          .forEach((point) => {
            observer.next(+point.price.toFixed(4));
          });
      }
    })

  observer.complete(); // <-- Complete observable after all emissions
}).pipe(
  delay(500),
  repeat()
);

Upvotes: 1

Z Dafoe
Z Dafoe

Reputation: 126

One option is to use the setTimeout function to repeat your code instead of an observable.

Here is an example of doing so.

export class SomeComponent implements OnInit,OnDestroy {

    Alive = true

    ngOnInit(){
        this.repeating_function()
        
    }
    ngOnDestroy(){
        this.Alive = false
    }

    repeating_function(){
        // some code

        //repeat the function every 500 ms while the component is active
        if (this.Alive){
            setTimeout(() => {this.repeating_function()}, 500);
        }
    }
}

Upvotes: 2

Related Questions