Mr.H.
Mr.H.

Reputation: 1045

Use RxJs to empty a queue

I am trying to empty a queue/buffer of unknown length that is limited by an empty string.

Example: in->["","P2","P1"]->out

There is a call that returns the next element in the queue. function getNext():Observable<string> This Observable is cold and short-lived. So the only thing that i could do is:

getNext().subscribe(console.log,console.err,()=>console.log("complete"));
--> "P1"
--> "complete"
getNext().subscribe(console.log,console.err,()=>console.log("complete"));
--> "P2"
--> "complete"
getNext().subscribe(console.log,console.err,()=>console.log("complete"));
--> ""
--> "complete"

I would like to turn it into a semi-short-lived observable that emitts values until the empty string is emitted and then completes.

theBetterGetNext().subscribe(console.log,console.err,()=>console.log("complete"));
--> "P1"
--> "P2"
--> ""
--> "complete"

For a fixed length that would be easy. of(1,2,3).pipe(mergeMap(()=>getNext())) but I am struggeling with the unknown length. Could you please guide me to a solution?

Upvotes: 0

Views: 186

Answers (2)

olivarra1
olivarra1

Reputation: 3399

I think repeatWhen is your operator:

const theBetterGetNext = () => getNext().pipe(
  repeatWhen(value => value === "" ? EMPTY : of(null))
);

theBetterGetNext().subscribe(
  console.log,
  console.error,
  () => console.log("complete")
);

This should resubscribe to the source until the value is empty (=== "")

Edit: I'm making an assumption here: You don't need to call getNext again to get a new value, but you can reuse the initial observable...

I just found another solution if you need to call getNext():

const theBetterGetNext = () => getNext().pipe(
  expand(value => value === "" ? EMPTY : getNext())
);

theBetterGetNext().subscribe(
  console.log,
  console.error,
  () => console.log("complete")
);

Upvotes: 4

Steve Holgado
Steve Holgado

Reputation: 12071

You could use the takeWhile operator to take values only while they are not an empty string.

Once you get an empty string, the observable will complete:

of(1, 2, 3)
  .pipe(
    mergeMap(() => getNext()),
    takeWhile(val => val !== "")
  )
  .subscribe(
    console.log,
    console.error,
    () => console.log("complete")
  );

Upvotes: 2

Related Questions