Reputation: 2570
Suppose I have 3 publishers and 1 processor. The publishers emits items in the form {key: <integer>, value: <object>, publisher_id: <string>}
.
The publishers makes IO operations, so:
N
items at a given moment.{key: <integer>, values: <list>}
)I've actually already implemented a FluxProcessor
that has an internal storage (ConcurrentHashMap
) to keep all the items. It manually request()
new items whenever CAPACITY wasn't reached.
I'd like to know if there's a built-in functionality to do that with RxJava(2)/ Spring Reactor API?
Upvotes: 0
Views: 660
Reputation: 69997
Use merge, rebatchRequests and toMultimap with RxJava 2:
Flowable<KeyValuePublisher> source1 = ...
Flowable<KeyValuePublisher> source2 = ...
Flowable<KeyValuePublisher> source3 = ...
Flowable.merge(
source1.rebatchRequests(N),
source2.rebatchRequests(N),
source3.rebatchRequests(N)
)
.toMultimap(kvp -> kvp.key, kvp -> kvp.value)
subscribe(map -> System.out.println(map));
Upvotes: 1