Picci
Picci

Reputation: 17762

Implement a loop logic within an rxjs pipe

I have a class, QueueManager, which manages some queues.

QueueManager offers 3 APIs

deleteQueue is a fire-and-forget API, in the sense that it does not return any signal when it has completed its work and deleted the queue. At the same time createQueue fails if a queue with the same name already exists.

listQueues() returns the names of the queues managed by QueueManager.

I need to create a piece of logic which deletes a queue and recreates it. So my idea is to do something like

I do not think I can use retry or repeat operators since they resubscribe to the source, which in this case would mean to issue the deleteQueue command more than once, which is something I need to avoid.

So what I have thought to do is something like

deleteQueue(queueName).pipe(
  map(() => [queueName]),
  expand(queuesToDelete => {
    return listQueues().pipe(delay(100))  // 100 ms of delay between checks
  }),
  filter(queues => !queues.includes(queueName)),
  first() // to close the stream when the queue to cancel is not present any more in the list
)

This logic seems actually to work, but looks to me a bit clumsy. Is there a more elegant way to address this problem?

Upvotes: 1

Views: 357

Answers (1)

NickL
NickL

Reputation: 1960

The line map(() => [queueName]) is needed because expand also emits values from its source observable, but I don't think that's obvious from just looking at it.

You can use repeat, you just need to subscribe to the listQueues observable, rather than deleteQueue.

I've also put the delay before listQueues, otherwise you're waiting to emit a value that's already returned from the API.

const { timer, concat, operators } = rxjs; 
const { tap, delay, filter, first, mapTo, concatMap, repeat } = operators;

const queueName = 'A';
const deleteQueue = (queueName) => timer(100);
const listQueues = () => concat(
  timer(1000).pipe(mapTo(['A', 'B'])),
  timer(1000).pipe(mapTo(['A', 'B'])),
  timer(1000).pipe(mapTo(['B'])),
);

const source = deleteQueue(queueName).pipe(
  tap(() => console.log('queue deleted')),
  concatMap(() =>
    timer(100).pipe(
      concatMap(listQueues),
      tap(queues => console.log('queues', queues)),
      repeat(),
      filter(queues => !queues.includes(queueName)),
      first()
    )
  )
);

source.subscribe(x => console.log('next', x), e => console.error(e), () => console.log('complete'));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.4/rxjs.umd.js"></script>

Upvotes: 1

Related Questions