Master Mind
Master Mind

Reputation: 3084

RxJava : combining multiple streams of objects based on a certain condition

I have multiple sources of notifications : A, B, C (all of them producing the different type of objects but having some common properties) that I want to combine into a new Stream when a certain condition is fulfilled.

Example of Streams :

A: [ ObjectA(id=1), ObjectA(id=2), ObjectA(id=3), ObjectA(id=5), ObjectA(id=4)]
B: [ ObjectB(id=1), ObjectB(id=2), ObjectB(id=3), ObjectB(id=4), ObjectB(id=5)]
C: [ ObjectC(id=1), ObjectC(id=2), ObjectC(id=3), ObjectC(id=4), ObjectC(id=5)]

The desired output:

Result: [ ObjectABC(id=1), ObjectABC(id=2), ObjectABC(id=3), ObjectA(id=4), ObjectA(id=5)]

Each ObjectABC will be created and added to the result stream when an ObjectA, ObjectB and ObjectC having the same id are received.

If no match found it should wait until a triplet having the same I'd is found.

In the examples when the current triplet items received does not match (like for the fourth iteration A's fourth item has an Id = 5 ) it should be kept until a the matching items from the other streams are processed or discarded after a period of time.

Is this can be achievable via RxJava.

Upvotes: 1

Views: 596

Answers (1)

sdgfsdh
sdgfsdh

Reputation: 37045

I don't have an IDE to hand, but this should explain the logic:

// First, extract all of the IDs as they arrive
final Observable<Integer> ids = as.map(i -> i.id).distinct();

// Then, for each ID, extract the ObjectA, ObjectB and ObjectC instances
// and zip them together. 
final Observable<ObjectABC> abcs = ids.flatMap(id -> Observable.zip(
    as.filter(a -> a.id.equals(id)).take(1), 
    bs.filter(b -> b.id.equals(id)).take(1), 
    cs.filter(c -> c.id.equals(id)).take(1),
    (a, b, c) -> new ObjectABC(a, b, c));

If you want to enforce ordering:

abcs.sort(Comparator.comparing(abc -> abc.id)); 

Upvotes: 1

Related Questions