Fran b
Fran b

Reputation: 3036

Difference between two flux

I have two fluxes where order is not important and I want to get one resultant flux with the difference. How can I do this?

Example:

Flux<Tweet> remoteTweets = Flux.just(
        new Tweet("tag1",new TweetID("text","name"),"userimage","country","place"),
        new Tweet("tag2",new TweetID("text","name"),"userimage","country","place")
);

Flux<Tweet> localTweets = Flux.just(
        new Tweet("tag1",new TweetID("text","name"),"userimage","country","place")
);

Result expected: tag2.

Upvotes: 0

Views: 1246

Answers (1)

Martin Tarj&#225;nyi
Martin Tarj&#225;nyi

Reputation: 9947

If localTweets is a finite stream and fits into memory, then the following works:

Flux<Tweet> remoteTweets = Flux.just(
        new Tweet("tag1", new TweetID("text", "name"), "userimage", "country", "place"),
        new Tweet("tag2", new TweetID("text", "name"), "userimage", "country", "place")
);

Flux<Tweet> localTweets = Flux.just(
        new Tweet("tag1", new TweetID("text", "name"), "userimage", "country", "place")
).cache(); // note cache operator here, to avoid mutiple subscription

remoteTweets
        .filterWhen(remoteTweet -> localTweets.hasElement(remoteTweet).map(hasElement -> !hasElement))
        .subscribe(System.out::println);

If the stream is finite, but not fits into memory, then you should leave the cache operator out. That will mean localTweets Flux will be subscribed to multiple times.

If the stream is infinite, you should apply some windowing strategy (e.g. check tweets only from last 10 minutes).

Upvotes: 2

Related Questions