wiemann
wiemann

Reputation: 395

Delaying rxjs 5 Observable emission by time defined in data with scheduler

I want to delay a Stream of data with a delay as defined in the data:

Rx.Observable.from([
  {message: "one", delay: 100}, 
  {message: "two", delay: 500}, 
  {message: "three", delay: 10500}
]).subscribe((e) => console.log(e.message))
  1. It should log "one" after 100 ms, "two" after 500 ms.
  2. I would like to be able to cancel the timer before the message “three” is emitted.

How would I define a rxjs 5 scheduler to accomplish that?

Upvotes: 0

Views: 692

Answers (1)

Mark van Straten
Mark van Straten

Reputation: 9425

Depending on how you want to delay there are a few routes you can take:

Delay the given time between each emission

Using .concatMap every emission will have to wait for the delayed previous emission to complete before the next:

Rx.Observable.from([
  {message: "one", delay: 500}, 
  {message: "two", delay: 500}, 
  {message: "three", delay: 500}
])
.concatMap(val => Rx.Observable.of(val.message).delay(val.delay))
.subscribe(val => console.log(val))
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.3/Rx.js"></script>

Every emission arrives after the previous has been done.

Delay should be relative to start of stream:

Using .mergeMap all emissions will be started at the time they arrive and their delay will start at that time.

Rx.Observable.from([
  {message: "one", delay: 500}, 
  {message: "two", delay: 500}, 
  {message: "three", delay: 500}
])
.mergeMap(val => Rx.Observable.of(val.message).delay(val.delay))
.subscribe(val => console.log(val))
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.3/Rx.js"></script>

Note that all three emissions arrive at the same time.

By using standard Rx operators your cancellation support is built in. Just unsubscribe from the stream and all pending future emissions are ignored.

Upvotes: 4

Related Questions