Laurence Gonsalves
Laurence Gonsalves

Reputation: 143074

How to efficiently split a single input Flux into many output Flux based on a computed element property?

We have some code that is given a Flux<Event> containing all events. Clients then request a Flux<Event> for a subset of these events.

The code does something like:

// Note: Kotlin code, but this question is not Kotlin-specific

/**
 * All incoming events
 */
private val allEvents: Flux<Event> = ...

/**
 * Returns an flux of the events with the matching key.
 */
fun eventsForKey(key: String): Flux<Event> {
    return allEvents.filter { event ->
        event.key == key
    }
}

So we've got allEvents, which has all of the incoming events, and the eventsForKey function is called (potentially many times) to create a Flux<Event> of only the events with the key specified. There are potentially a lot of these filtered Flux instances that are alive concurrently.

My concern is that this is effectively doing a linear search for which "sub-Flux" to deliver each event to. That is, if there are n sub-Flux instances alive at a given moment, and a single event arrives, the event will be tested against all n filter predicates.

What I want is a something that will let me specify an input Flux and a key function, and then (repeatedly) obtain an output Flux for any given key value. Each sub-Flux would behave just like the filtered ones above, but instead of executing n predicate checks for each event, each event would result in one key computation and a single dictionary lookup for the outgoing Flux. Events that don't match an existing sub-Flux should be discarded, just as they would be with a filter.

I found Flux.groupBy (which is also the accepted answer to this related question) but:

  1. Its return type is the unwieldy Flux<GroupedFlux<K,T>>:

    • I don't want the sub-Flux for a group to come into existence when its first event appears. I need to be able to obtain a Flux for a given key on demand, which is potentially before any events matching that key have arrived.

    • I also don't want to have to deal with groups that no downstream consumer has asked for. Events that don't match a key downstream consumers have asked for should just be filtered out.

  2. Its documentation states:

    Note that groupBy works best with a low cardinality of groups, so chose your keyMapper function accordingly.

    I'm not sure if "a low cardinality of groups" means each "group" needs to be small, or if the number of groups needs to be small. (and I don't know what "small" means in this context.) I am specifically trying to deal with a situation where the number of sub-Flux instances may be large.

Does Reactor provide a way to efficiently demultiplex a Flux like this?

Upvotes: 3

Views: 1323

Answers (2)

K.Nicholas
K.Nicholas

Reputation: 11551

To do it properly probably takes a better understanding of the core reactor framework than I am personally familiar with but it seems that you want a single Subscriber and multiple Publishers driven by a HashMap. A decorated Subscriber should be easy enough in concept:

class DeMuxedSubscriber<T> implements Subscriber<T> {
    Map<T, SimplePublisher<T>> mapPublishers = new HashMap<>();

    @Override
    public void onSubscribe(Subscription s) {
        s.request(Long.MAX_VALUE);
    }

    @Override
    public void onNext(T s) {
        if ( mapPublishers.get(s) != null) 
            mapPublishers.get(s).subscriber.onNext(s);
    }
    @Override
    public void onError(Throwable t) {
        mapPublishers.values().forEach(sp->sp.subscriber.onError(t));
    }

    @Override
    public void onComplete() {
        mapPublishers.values().forEach(sp->sp.subscriber.onComplete());
    }

    public Publisher<T> getPublisher(T s) {
        mapPublishers.putIfAbsent(s, new SimplePublisher<T>());
        return mapPublishers.get(s);
    }
};

And there is probably a class somewhere that handles being a publisher just fine but this will suffice to illustrate:

class SimplePublisher<T> implements Publisher<T> {
    Subscriber<? super T> subscriber;
    
    @Override
    public void subscribe(Subscriber<? super T> s) {
        subscriber = s;
    }

}

And then you can make a simple example to use it. This all seems a bit awkward, and the example DeMuxedSubscriber shown here ignores backpressure, but hey, details:

Flux<String> wordFlux = Flux.generate(() -> new Integer(0), (i, sink) -> {
    if (i >= 100)
        sink.complete();
    i = i + 1;
    sink.next(Integer.toString(largestPrimeFactor(i)));
    return i;
});
DeMuxedSubscriber<String> deMuxedSubscriber = new DeMuxedSubscriber<>();  
Flux.from(deMuxedSubscriber.getPublisher("3")).subscribe(System.out::println);
Flux.from(deMuxedSubscriber.getPublisher("5")).subscribe(System.out::println);
wordFlux.subscribe(deMuxedSubscriber);

Upvotes: 1

vins
vins

Reputation: 15370

Your question sounded very interesting to me and I was playing with this. This solution might not be elegant; but I simply wanted to share!


Your requirement sounds like you need some stateful predicate for filtering events before sub-fluxing to avoid every subscriber to do the filtering on their own! In that case, we need to maintain a list/set somewhere to hold the list of allowed events. [In my example, I am going to assume I have a flux of string and the first character is the event. Based on other answer you have included in your question]

// map for char and the corresponding flux
private static final Map<Character, Flux<String>> CHAR_FLUX = new HashMap<>();

// allowed chars. empty initially
private static final List<Character> ALLOWED_CHARS = new ArrayList<>();

// stateful predicate
private static final Predicate<Character> IS_ALLOWED = c -> {
    System.out.println("IS_ALLOWED check : " + c);
    return ALLOWED_CHARS.contains(c);
};


Flux<GroupedFlux<Character, String>> groupedFluxFlux = Flux.just("a1", "b1", "c1", "a2", "b2", "c2", "a3", "b3", "c3", "a4", "b4", "c4", "a1", "b1", "c1", "a2", "b2", "c2", "a3", "b3", "c3", "a4", "b4", "c4")
        .delayElements(Duration.ofMillis(1000))
        .filter(s -> IS_ALLOWED.test(s.charAt(0)))  // check if it is allowed
        .groupBy(s -> s.charAt(0))                  // group by starts only for the allowed keys
        .cache();

groupBy returns unicastprocessor which can be consumed by only one subscriber. In your case, if you expect more than 1 subscriber for the same key, then we need this map. Otherwise it is not required.

Your eventsForKey method would return the key value from the map after adding it to the list/set.

// here the filter is just 1 filter for 1 subscriber. does not filter for every event
ALLOWED_CHARS.add('a');
return CHAR_FLUX.computeIfAbsent('a', k -> Flux.defer(() -> groupedFluxFlux.filter(gf -> gf.key() == 'a').flatMap(Function.identity())).cache());

Assumptions:

You have a limited set of events (cardinality). Otherwise the list/map might grow & groupedFlux might also not perform very well.

Upvotes: 1

Related Questions