Reputation: 395
I want to delay a Stream of data with a delay as defined in the data:
Rx.Observable.from([
{message: "one", delay: 100},
{message: "two", delay: 500},
{message: "three", delay: 10500}
]).subscribe((e) => console.log(e.message))
How would I define a rxjs 5 scheduler to accomplish that?
Upvotes: 0
Views: 692
Reputation: 9425
Depending on how you want to delay there are a few routes you can take:
Using .concatMap
every emission will have to wait for the delayed previous emission to complete before the next:
Rx.Observable.from([
{message: "one", delay: 500},
{message: "two", delay: 500},
{message: "three", delay: 500}
])
.concatMap(val => Rx.Observable.of(val.message).delay(val.delay))
.subscribe(val => console.log(val))
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.3/Rx.js"></script>
Every emission arrives after the previous has been done.
Using .mergeMap
all emissions will be started at the time they arrive and their delay will start at that time.
Rx.Observable.from([
{message: "one", delay: 500},
{message: "two", delay: 500},
{message: "three", delay: 500}
])
.mergeMap(val => Rx.Observable.of(val.message).delay(val.delay))
.subscribe(val => console.log(val))
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.3/Rx.js"></script>
Note that all three emissions arrive at the same time.
By using standard Rx operators your cancellation support is built in. Just unsubscribe from the stream and all pending future emissions are ignored.
Upvotes: 4