Reputation: 12803
What would be the most idiomatic way to yield values of an Observable by a specific amount of time? For example, let's say I have an Observable created from a big Array and I want to yield a value every 2 seconds. Is a combination of interval
and selectMany
the best way?
Upvotes: 36
Views: 23939
Reputation: 5403
A simple one-liner:
const delayMs = 2000
from([1, 2, 3]).pipe(concatMap(x => of(x).pipe(delay(delayMs)))).subscribe(item => {
});
Upvotes: 1
Reputation: 19441
RxJs 6 code that emits the first item immediately and delays the remaining items:
import { of, EMPTY, concat } from "rxjs";
import { concatMap, delay } from "rxjs/operators";
const delayed$ = EMPTY.pipe(delay(1000));
console.log("start");
of(1, 2, 3, 4)
.pipe(concatMap(v => concat(of(v), delayed$)))
.subscribe({
next: console.log
});
idea:
concat
) that will output the item immediately (of(v)
) and then emit an EMPTY
observable after the delayconcatMap
all emitted observables will be emitted in the correct orderUpvotes: 0
Reputation: 2221
//create a custom operator
const delayEach=(millis)=>(o)=>o.pipe(concatMap((x)=>of(x).pipe(delay(millis))))
of(1, 2, 3, 4, 5)
.pipe(delayEach(1000))
.subscribe(console.log);
Upvotes: 0
Reputation: 1
Building on the zip solutions by farincz and user3587412, here is how it works in RxJS v6
const { zip, from, timer } = require("rxjs")
const { map } = require("rxjs/operators")
const input = [1, 2, 3, 4, 5]
const delay = 2000
zip(
from(input),
timer(0, delay)
).pipe(
map(([ delayedInput, _timer ]) => delayedInput) // throw away timer index
).subscribe(
console.log
)
Upvotes: 0
Reputation: 473
For RxJS v6 getting the next one with a delay of 2 seconds.
Example 1. concatMap:
import {of} from 'rxjs';
import {concatMap, delay} from 'rxjs/operators';
of(1, 2, 3, 4, 5)
.pipe(
concatMap(x => of(x)
.pipe(
delay(2000))
)
)
.subscribe({
next(value) {
console.log(value);
}
});
Example 2. map + concatAll:
import {of} from 'rxjs';
import {concatAll, delay, map} from 'rxjs/operators';
of(1, 2, 3, 4, 5)
.pipe(
map(x => of(x)
.pipe(
delay(2000))
),
concatAll()
)
.subscribe({
next(value) {
console.log(value);
}
});
Upvotes: 11
Reputation: 18125
While Brandon's answer gets the gist of the idea, here's a version which yields the first item immediately, then puts time between the following items.
var delay = Rx.Observable.empty().delay(2000);
var items = Rx.Observable.fromArray([1,2,3,4,5])
.map(function (x) {
return Rx.Observable.return(x).concat(delay); // put some time after the item
})
.concatAll();
Updated for newer RxJS:
var delay = Rx.Observable.empty().delay(2000);
var items = Rx.Observable.fromArray([1,2,3,4,5])
.concatMap(function (x) {
return Rx.Observable.of(x).concat(delay); // put some time after the item
});
Upvotes: 9
Reputation: 92324
Since this wasn't mentioned, I think concatMap
combined with delay
is pretty readable.
Rx.Observable.fromArray([1, 2, 3, 4, 5])
.concatMap(x => Rx.Observable.of(x).delay(1000));
See https://codepen.io/jmendes/pen/EwaPzw
Upvotes: 4
Reputation: 1053
For RxJS 5:
Rx.Observable.from([1, 2, 3, 4, 5])
.zip(Rx.Observable.timer(0, 2000), x => x)
.subscribe(x => console.log(x));
Upvotes: 6
Reputation: 6418
Agree that zip is a clean approach. Here is a reusable function to generate an interval stream for an array:
function yieldByInterval(items, time) {
return Rx.Observable.from(items).zip(
Rx.Observable.interval(time),
function(item, index) { return item; }
);
}
// test
yieldByInterval(['A', 'B', 'C'], 2000)
.subscribe(console.log.bind(console));
This builds on farincz's answer, but is slightly shorter by using .zip
as an instance method.
Also, I used Rx.Observable.from()
because Rx.Observable.fromArray()
is deprecated.
Upvotes: 4
Reputation: 5173
I think that using zip produce better and more readable code, still using just 3 observables.
var items = ['A', 'B', 'C'];
Rx.Observable.zip(
Rx.Observable.fromArray(items),
Rx.Observable.timer(2000, 2000),
function(item, i) { return item;}
)
Upvotes: 26
Reputation: 39222
For your specific example, the idea is to map each value from the array to an observable that will yield its result after a delay, then concatenate the resulting stream of observables:
var delayedStream = Rx.Observable
.fromArray([1, 2, 3, 4, 5])
.map(function (value) { return Rx.Observable.return(value).delay(2000); })
.concatAll();
Other examples might indeed make use of timer
or interval
. It just depends.
For example, if your array is really really big, then the above will cause a fair amount of memory pressure (because it is creating N
observables for a really large N
). Here is an alternative that uses interval
to lazily walk the array:
var delayedStream = Rx.Observable
.interval(2000)
.take(reallyBigArray.length) // end the observable after it pulses N times
.map(function (i) { return reallyBigArray[i]; });
This one will yield the next value from the array every 2 seconds until it has iterated over the entire array.
Upvotes: 37