XeniaSis
XeniaSis

Reputation: 2354

rxjs check if stream is empty before handling data

We have the following stream.

const recorders = imongo.listCollections('recorders')
    .flatMapConcat(names => {
        const recorders = names
            .map(entry => entry.name)
            .filter(entry => !_.contains(
                ['recorders.starts',
                 'recorders.sources',
                 'system.indexes',
                 'system.users'],
                entry));
        console.log(recorders);
        return Rx.Observable.fromArray(recorders);
    });

recorders.isEmpty()
    .subscribe(
        empty => {
            if(empty) {
                logger.warn('No recorders found.');
            }
        },
        () => {}
    );

recorders.flatMapConcat(createRecorderIntervals)
    .finally(() => process.exit(0))
    .subscribe(
        () => {},
        e => logger.error('Error while updating: %s', e, {}),
        () => logger.info('Finished syncing all recorders')
    );

If the stream is empty then we don't want to createRecorderIntervals. The above piece of code is working. However, checking if the stream is empty, is causing the console.log to be executed twice. Why is this happening? Can I fix it somehow?

EDIT: So, I went the following way, after rethinking it thanks to @Martin's answer

const recorders = imongo.listCollections('recorders')
    .flatMapConcat(names => {
        const recorders = names
            .map(entry => entry.name)
            .filter(entry => !_.contains(
                ['recorders.starts',
                 'recorders.sources',
                 'system.indexes',
                 'system.users'],
                entry));

        if(!recorders.length) {
            logger.warn('No recorders found.');
            return Rx.Observable.empty();
        }

        return Rx.Observable.fromArray(recorders);
    })
    .flatMapConcat(createRecorderIntervals)
    .finally(() => scheduleNextRun())
    .subscribe(
        () => {},
        e => logger.error('Error while updating: %s', e, {}),
        () => logger.info('Finished syncing all recorders')
    );

Upvotes: 1

Views: 2332

Answers (1)

martin
martin

Reputation: 96999

When you call subscribe() method on an Observable it causes the entire chain of operators to be created which it turn calls imongo.listCollections('recorders') twice in your case.

You can insert an operator before calling flatMapConcat(createRecorderIntervals) that checks whether the result is empty. I have one of them in mind particularly but there might be other that suit your needs even better:

  • takeWhile() - takes predicate as an argument and emits onComplete when it return false.

Then your code would be like the following:

const recorders = imongo.listCollections('recorders')
    .flatMapConcat(names => {
        ...
        return Rx.Observable.fromArray(recorders);
    })
    .takeWhile(function(result) {
        // condition
    })
    .flatMapConcat(createRecorderIntervals)
        .finally(() => process.exit(0))
        .subscribe(...);

I don't know what exactly your code does but I hope you get the idea.

Edit: If you want to be notified when the entire Observable is empty than there're a multiple of ways:

  • do() operator and a custom Observer object. You'll write a custom Observer and put it using do() operator before .flatMapConcat(createRecorderIntervals) . This object will count how many times its next callback was called and when the preceding Observable completes you can tell whether there was at least one or there were no results at all.

  • create a ConnectableObservable. This one is maybe the most similar to what you we're doing at the beginning. You'll turn your recorders into ConnectableObservable using publish() operator. Then you can subscribe multiple Observers without triggering the operator chain. When you have all your Observers subscribed you call connect() and it'll sequentially emit values to all Observers:

    var published = recorders.publish();
    
    published.subscribe(createObserver('SourceA'));
    published.subscribe(createObserver('SourceB'));
    
    // Connect the source
    var connection = published.connect();
    

    In your case, you'd create two Subjects (because they act as Observable and Observer at the same time) and chain one of them with isEmpty() and the second one with flatMapConcat(). See the doc for more info: http://reactivex.io/documentation/operators/connect.html

I think the first option is actually easier for you.

Upvotes: 1

Related Questions