Farrukh Najmi
Farrukh Najmi

Reputation: 5316

rxjava filter for items in stream not matching any other filters on stream

A CommandManager in my app receives commands over a network protocol and publishes them on a commands PublishSubject. Specific CommandHandler classes subscribe to the commands PublishSubject such that each CommandHandler receives only commands that it is designed to handle. I also want to have a Default command handler that receives only thos commands that was matched by no other CommandHandler's filter predicate.

// Each CommandHandler subscribes to a filtered 
// stream of incoming commands matching this CommanHandler

commandManager.getCommandsPublishSubject()
.filter(getMessageFilterPredicate())
.subscribe(this::onNewMessage, 
     e -> LOGGER.error("Error getting new message", e));

Is there a way to apply a filter that matches items that were not matched by any CommandHandler?

Upvotes: 1

Views: 136

Answers (1)

akarnokd
akarnokd

Reputation: 69997

There is no standard operator for doing this. You'd have to create a custom component, not operator, that hosts the filter lambdas and more subjects for each case and the default subject.

public final class FilteringDispatcher<T> {

    static final class FilterAndSubject<T> {
         Predicate<T> predicate;
         Subject<T> subject;
    }

    final ConcurrentMap<String, FilterAndSubject<T>> filters = new ConcurrentHashMap<>();

    final Subject<T> defaultSubject = PublishSubject.<T>create().toSerialized();

    public Observable<T> registerPredicate(String name, Predicate<T> predicate) {
         Subject<T> subject = PublishSubject.<T>create().toSerialized();
         FilterAndSubject fs = new FilterAndSubject();
         fs.predicate = predicate;
         fs.subject = subject;

         FilterAndSubject old = filters.putIfAbsent(name, fs);
         return old != null ? old.subject : subject;
    }

    public Observable<T> getObservable(String name) {
        return filters.get(name).subject;
    }

    public Observable<T> getDefaultObservable() {
        return defaultSubject;
    }

    public void onNext(T item) {
        for (FilterAndSubject fs : filters.values()) {
            if (fs.predicate.test(item)) {
                fs.subject.onNext(item);
                return;
            }
        }
        defaultSubject.onNext(item);
    }
}

Upvotes: 1

Related Questions