mlemmy
mlemmy

Reputation: 93

RxJS: delaying incoming observables

I have a SnackbarComponent which is responsbile for displaying a Snackbar with informations for users. Notifications are coming from NotificationService to which I subscribe in my SnackbarComponent. Incoming messages are pushed to messages array and shifted from it after 3 seconds. In my html template I iterate over messages and display them.

export class SnackbarComponent implements OnInit, OnDestroy {
  private notificationSubscription: Subscription;
  messages: NotificationMessage[] = [];

  constructor(private notificationService: NotificationService) {
  }

  ngOnInit() {
    this.notificationSubscription = this.notificationService.notification$
      .subscribe(
        (message: NotificationMessage) => {
          this.messages.push(message);
          setTimeout(() => {
            this.messages.shift();
          }, 3000);
        }
      )
  }

  ngOnDestroy(): void {
    this.notificationSubscription.unsubscribe();
  }

The problem is, I'd like to display only 3 messages at a time and delay others (or display immediately if less than 3 messages currently).

I've got a working solution which looks like this:

private notificationSubscription: Subscription;
messages: NotificationMessage[] = [];
messagesQueue: NotificationMessage[] = [];


  ngOnInit() {
    this.notificationSubscription = this.notificationService.notification$
      .subscribe(
        (message: NotificationMessage) => {
          if (this.messages.length < 3) {
            this.messages.push(message);
            this.timeoutMessage();
          } else {
            this.messagesQueue.push(message);
          }
        }
      )
  }

  private timeoutMessage() {
    setTimeout(() => {
      this.messages.shift();
      if (this.messagesQueue.length > 0) {
        this.messages.push(this.messagesQueue.shift());
        this.timeoutMessage();
      }
    }, 3000);
  }

If there are 3 or messages, new messages are pushed to messagesQueue and in timeoutMessage() objects from messagesQueue are shifted to messages array.

But I don't find this solution elegant. Is there a RxJS way to somehow delay messages with some pipe operators?

Upvotes: 4

Views: 681

Answers (2)

Fateh Mohamed
Fateh Mohamed

Reputation: 21357

you can use delay operator or conditional delay delayWhen to delay based on a condition

import {delay} from 'rxjs/operators';

this.notificationService.notification$.pipe(
  delay(3000)
).subscribe();

example with delayWhen

this.notificationService.notification$.pipe(
  delayWhen(message => this.messages.length < 3 ? interval(0) : interval(3000))
).subscribe();

Upvotes: 3

CruelEngine
CruelEngine

Reputation: 2841

You can use BufferCount operator to do it . It will emit values 3 at a time and i've added a second delay just to make sure .

import {bufferCount,delay } from 'rxjs/operators';

this.notificationService.notification$.pipe(
  bufferCount(3),delay(3000)
).subscribe();

Here is a stackblitz example:

Upvotes: 0

Related Questions