Reputation: 3084
I have multiple sources of notifications : A, B, C (all of them producing the different type of objects but having some common properties) that I want to combine into a new Stream when a certain condition is fulfilled.
Example of Streams :
A: [ ObjectA(id=1), ObjectA(id=2), ObjectA(id=3), ObjectA(id=5), ObjectA(id=4)]
B: [ ObjectB(id=1), ObjectB(id=2), ObjectB(id=3), ObjectB(id=4), ObjectB(id=5)]
C: [ ObjectC(id=1), ObjectC(id=2), ObjectC(id=3), ObjectC(id=4), ObjectC(id=5)]
The desired output:
Result: [ ObjectABC(id=1), ObjectABC(id=2), ObjectABC(id=3), ObjectA(id=4), ObjectA(id=5)]
Each ObjectABC
will be created and added to the result stream when an ObjectA
, ObjectB
and ObjectC
having the same id are received.
If no match found it should wait until a triplet having the same I'd is found.
In the examples when the current triplet items received does not match (like for the fourth iteration A's fourth item has an Id = 5 ) it should be kept until a the matching items from the other streams are processed or discarded after a period of time.
Is this can be achievable via RxJava.
Upvotes: 1
Views: 596
Reputation: 37045
I don't have an IDE to hand, but this should explain the logic:
// First, extract all of the IDs as they arrive
final Observable<Integer> ids = as.map(i -> i.id).distinct();
// Then, for each ID, extract the ObjectA, ObjectB and ObjectC instances
// and zip them together.
final Observable<ObjectABC> abcs = ids.flatMap(id -> Observable.zip(
as.filter(a -> a.id.equals(id)).take(1),
bs.filter(b -> b.id.equals(id)).take(1),
cs.filter(c -> c.id.equals(id)).take(1),
(a, b, c) -> new ObjectABC(a, b, c));
If you want to enforce ordering:
abcs.sort(Comparator.comparing(abc -> abc.id));
Upvotes: 1