Reputation: 143074
We have some code that is given a Flux<Event>
containing all events. Clients
then request a Flux<Event>
for a subset of these events.
The code does something like:
// Note: Kotlin code, but this question is not Kotlin-specific
/**
* All incoming events
*/
private val allEvents: Flux<Event> = ...
/**
* Returns an flux of the events with the matching key.
*/
fun eventsForKey(key: String): Flux<Event> {
return allEvents.filter { event ->
event.key == key
}
}
So we've got allEvents
, which has all of the incoming events, and the
eventsForKey
function is called (potentially many times) to create a
Flux<Event>
of only the events with the key specified. There are potentially
a lot of these filtered Flux
instances that are alive concurrently.
My concern is that this is effectively doing a linear search for which "sub-Flux" to deliver each event to. That is, if there are n sub-Flux instances alive at a given moment, and a single event arrives, the event will be tested against all n filter predicates.
What I want is a something that will let me specify an input Flux and a key function, and then (repeatedly) obtain an output Flux for any given key value. Each sub-Flux would behave just like the filtered ones above, but instead of executing n predicate checks for each event, each event would result in one key computation and a single dictionary lookup for the outgoing Flux. Events that don't match an existing sub-Flux should be discarded, just as they would be with a filter.
I found Flux.groupBy
(which is also the accepted answer to this related
question) but:
Its return type is the unwieldy Flux<GroupedFlux<K,T>>
:
I don't want the sub-Flux for a group to come into existence when its first event appears. I need to be able to obtain a Flux for a given key on demand, which is potentially before any events matching that key have arrived.
I also don't want to have to deal with groups that no downstream consumer has asked for. Events that don't match a key downstream consumers have asked for should just be filtered out.
Its documentation states:
Note that groupBy works best with a low cardinality of groups, so chose your keyMapper function accordingly.
I'm not sure if "a low cardinality of groups" means each "group" needs to be small, or if the number of groups needs to be small. (and I don't know what "small" means in this context.) I am specifically trying to deal with a situation where the number of sub-Flux instances may be large.
Does Reactor provide a way to efficiently demultiplex a Flux like this?
Upvotes: 3
Views: 1323
Reputation: 11551
To do it properly probably takes a better understanding of the core reactor framework than I am personally familiar with but it seems that you want a single Subscriber
and multiple Publishers
driven by a HashMap
. A decorated Subscriber
should be easy enough in concept:
class DeMuxedSubscriber<T> implements Subscriber<T> {
Map<T, SimplePublisher<T>> mapPublishers = new HashMap<>();
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(T s) {
if ( mapPublishers.get(s) != null)
mapPublishers.get(s).subscriber.onNext(s);
}
@Override
public void onError(Throwable t) {
mapPublishers.values().forEach(sp->sp.subscriber.onError(t));
}
@Override
public void onComplete() {
mapPublishers.values().forEach(sp->sp.subscriber.onComplete());
}
public Publisher<T> getPublisher(T s) {
mapPublishers.putIfAbsent(s, new SimplePublisher<T>());
return mapPublishers.get(s);
}
};
And there is probably a class somewhere that handles being a publisher just fine but this will suffice to illustrate:
class SimplePublisher<T> implements Publisher<T> {
Subscriber<? super T> subscriber;
@Override
public void subscribe(Subscriber<? super T> s) {
subscriber = s;
}
}
And then you can make a simple example to use it. This all seems a bit awkward, and the example DeMuxedSubscriber
shown here ignores backpressure, but hey, details:
Flux<String> wordFlux = Flux.generate(() -> new Integer(0), (i, sink) -> {
if (i >= 100)
sink.complete();
i = i + 1;
sink.next(Integer.toString(largestPrimeFactor(i)));
return i;
});
DeMuxedSubscriber<String> deMuxedSubscriber = new DeMuxedSubscriber<>();
Flux.from(deMuxedSubscriber.getPublisher("3")).subscribe(System.out::println);
Flux.from(deMuxedSubscriber.getPublisher("5")).subscribe(System.out::println);
wordFlux.subscribe(deMuxedSubscriber);
Upvotes: 1
Reputation: 15370
Your question sounded very interesting to me and I was playing with this. This solution might not be elegant; but I simply wanted to share!
Your requirement sounds like you need some stateful predicate for filtering events before sub-fluxing to avoid every subscriber to do the filtering on their own! In that case, we need to maintain a list/set somewhere to hold the list of allowed events. [In my example, I am going to assume I have a flux of string and the first character is the event. Based on other answer you have included in your question]
// map for char and the corresponding flux
private static final Map<Character, Flux<String>> CHAR_FLUX = new HashMap<>();
// allowed chars. empty initially
private static final List<Character> ALLOWED_CHARS = new ArrayList<>();
// stateful predicate
private static final Predicate<Character> IS_ALLOWED = c -> {
System.out.println("IS_ALLOWED check : " + c);
return ALLOWED_CHARS.contains(c);
};
Flux<GroupedFlux<Character, String>> groupedFluxFlux = Flux.just("a1", "b1", "c1", "a2", "b2", "c2", "a3", "b3", "c3", "a4", "b4", "c4", "a1", "b1", "c1", "a2", "b2", "c2", "a3", "b3", "c3", "a4", "b4", "c4")
.delayElements(Duration.ofMillis(1000))
.filter(s -> IS_ALLOWED.test(s.charAt(0))) // check if it is allowed
.groupBy(s -> s.charAt(0)) // group by starts only for the allowed keys
.cache();
groupBy
returns unicastprocessor which can be consumed by only one subscriber. In your case, if you expect more than 1 subscriber for the same key, then we need this map. Otherwise it is not required.
Your eventsForKey method would return the key value from the map after adding it to the list/set.
// here the filter is just 1 filter for 1 subscriber. does not filter for every event
ALLOWED_CHARS.add('a');
return CHAR_FLUX.computeIfAbsent('a', k -> Flux.defer(() -> groupedFluxFlux.filter(gf -> gf.key() == 'a').flatMap(Function.identity())).cache());
Assumptions:
You have a limited set of events (cardinality). Otherwise the list/map might grow & groupedFlux might also not perform very well.
Upvotes: 1