Jon Sud
Jon Sud

Reputation: 11661

How to implement Queue to functions call in rxjs?

I have a button that call save function that call save function in the service.

    <button (click)="save()">save</button>
    save() {
     this.someService.save().subscribe(r => {
       console.log({ r });
     });
    }

I want each click on the button the action(the save method from the service) will insert to a queue.

After the current save method on the service is complete, then pull the next from queue and call again to save method from the service and so on until nothing in the queue.

I try something like this but I'm not sure why it's works only once..

* a note: the save function in SomeService I can't change.

    save() {
     this.someService.addToQueue(true).subscribe(r => {
       console.log({ r });
     });
    }
@Injectable({
  providedIn: "root"
})
export class SomeService {
  queue = new Subject();

  constructor() {
    this.queue.pipe(concatMap(a => this.save(a)));
  }

  addToQueue(saveAndExit) {
    this.queue.next(saveAndExit);

    return this.queue;
  }

  save(saveAndExit) {
    return Observable.create(obs => {
      setTimeout(() => {
        obs.next({ data: true });
      }, 5000);
    });
  }
}

Demo on stackblitz

Upvotes: 2

Views: 556

Answers (1)

Julius Dzidzevičius
Julius Dzidzevičius

Reputation: 11000

The problem with concatMap here is that it does not subscribe to the next inner Observable until previous completes. If you complete it:

  save(saveAndExit) {
    return Observable.create(obs => {
      setTimeout(() => {
        obs.next({ data: true });
        obs.complete()
      }, 5000);
    });
  }

Your problem would be solved, but as you can not change that function, you need to manually complete it, with f.ex. take(1) or first():

this.queue.pipe(concatMap(a => this.save(a).pipe(take(1)) ));

Upvotes: 2

Related Questions