IsaacLevon
IsaacLevon

Reputation: 2570

Reactive Streams: How to wait for all publishers, by key?

Suppose I have 3 publishers and 1 processor. The publishers emits items in the form {key: <integer>, value: <object>, publisher_id: <string>}.

The publishers makes IO operations, so:

I've actually already implemented a FluxProcessor that has an internal storage (ConcurrentHashMap) to keep all the items. It manually request() new items whenever CAPACITY wasn't reached.

I'd like to know if there's a built-in functionality to do that with RxJava(2)/ Spring Reactor API?

Upvotes: 0

Views: 660

Answers (1)

akarnokd
akarnokd

Reputation: 69997

Use merge, rebatchRequests and toMultimap with RxJava 2:

Flowable<KeyValuePublisher> source1 = ...
Flowable<KeyValuePublisher> source2 = ...
Flowable<KeyValuePublisher> source3 = ...

Flowable.merge(
    source1.rebatchRequests(N),
    source2.rebatchRequests(N),
    source3.rebatchRequests(N)
)
.toMultimap(kvp -> kvp.key, kvp -> kvp.value)
subscribe(map -> System.out.println(map));

Upvotes: 1

Related Questions