Reputation: 17762
I have a class, QueueManager, which manages some queues.
QueueManager offers 3 APIs
deleteQueue(queueName: string): Observable<void>
createQueue(queueName: string): Observable<string>
listQueues(): Observable<string>
: Observable`deleteQueue
is a fire-and-forget API, in the sense that it does not return any signal when it has completed its work and deleted the queue. At the same time createQueue
fails if a queue with the same name already exists.
listQueues()
returns the names of the queues managed by QueueManager.
I need to create a piece of logic which deletes a queue and recreates it. So my idea is to do something like
deleteQueue(queueName)
methodlistQueues
method until the result returned shows that queueName
is not there any morecreateQueue(queueName)
I do not think I can use retry
or repeat
operators since they resubscribe to the source, which in this case would mean to issue the deleteQueue
command more than once, which is something I need to avoid.
So what I have thought to do is something like
deleteQueue(queueName).pipe(
map(() => [queueName]),
expand(queuesToDelete => {
return listQueues().pipe(delay(100)) // 100 ms of delay between checks
}),
filter(queues => !queues.includes(queueName)),
first() // to close the stream when the queue to cancel is not present any more in the list
)
This logic seems actually to work, but looks to me a bit clumsy. Is there a more elegant way to address this problem?
Upvotes: 1
Views: 357
Reputation: 1960
The line map(() => [queueName])
is needed because expand
also emits values from its source observable, but I don't think that's obvious from just looking at it.
You can use repeat
, you just need to subscribe to the listQueues
observable, rather than deleteQueue
.
I've also put the delay before listQueues
, otherwise you're waiting to emit a value that's already returned from the API.
const { timer, concat, operators } = rxjs;
const { tap, delay, filter, first, mapTo, concatMap, repeat } = operators;
const queueName = 'A';
const deleteQueue = (queueName) => timer(100);
const listQueues = () => concat(
timer(1000).pipe(mapTo(['A', 'B'])),
timer(1000).pipe(mapTo(['A', 'B'])),
timer(1000).pipe(mapTo(['B'])),
);
const source = deleteQueue(queueName).pipe(
tap(() => console.log('queue deleted')),
concatMap(() =>
timer(100).pipe(
concatMap(listQueues),
tap(queues => console.log('queues', queues)),
repeat(),
filter(queues => !queues.includes(queueName)),
first()
)
)
);
source.subscribe(x => console.log('next', x), e => console.error(e), () => console.log('complete'));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.4/rxjs.umd.js"></script>
Upvotes: 1