cdxf
cdxf

Reputation: 5648

How to delay sequence emitting in Rxjs

I have an observable:

  messages: string[] = ['a', 'b', 'c'];
  const source = from(messages)

How do you delay it so when anyone subscribes to it, it will delay for n second to emit the items? So:

source.subscribe(i => console.log(i));
// output  ...n seconds... 'a' ...n seconds... 'b' ...n seconds... 'c'

Upvotes: 3

Views: 2852

Answers (4)

fingerpich
fingerpich

Reputation: 9330

I had the same problem and I solved it as the following code

const {from, of} = rxjs;
const {concatMap, delay} = rxjs.operators;

from(['a', 'b', 'c'])
   .pipe(concatMap((msg) => of(msg).pipe(delay(1000))))
   .subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.4.0/rxjs.umd.min.js"></script>

Upvotes: 4

a better oliver
a better oliver

Reputation: 26828

You can combine the stream with an interval using zip:

zip(
  from(['a', 'b', 'c', 'd']),
  interval(1000),
  (a, b) => a
)
.subscribe(console.log);

zip would combine the nth elements of the each stream into an array. That's way we use a selector function: (a, b) => a. It ensures that only elements from the first stream are used. The interval stream is only used for delaying emission.

Upvotes: 6

cdxf
cdxf

Reputation: 5648

const source = from(['a', 'b', 'c', 'd']);
const delayPerElements = source
  .pipe(map(v => of(v).pipe(delay(1000))), concatAll());
delayPerElements.subscribe(it => console.log(it));
// ... a ... b ... c ... d

I don't know if this is the best way but it works for me. Hope this help someone in the future.

As @Igno Burk suggestion:

const source = from(['a', 'b', 'c', 'd']);
const delayPerElements = source
  .pipe(concatMap(v => of(v).pipe(delay(1000))));
delayPerElements.subscribe(it => console.log(it));
// ... a ... b ... c ... d
return delayPerElements;

Upvotes: 0

CozyAzure
CozyAzure

Reputation: 8468

You can just use the .delay() operator:

messages: string[] = ['a', 'b', 'c'];
const source = from(messages).pipe(
    delay(1000)//delay for 1 second
)

Remember to import delay:

import { delay } from 'rxjs/internal/operators';

Upvotes: 0

Related Questions