Reputation: 3036
I have two fluxes where order is not important and I want to get one resultant flux with the difference. How can I do this?
Example:
Flux<Tweet> remoteTweets = Flux.just(
new Tweet("tag1",new TweetID("text","name"),"userimage","country","place"),
new Tweet("tag2",new TweetID("text","name"),"userimage","country","place")
);
Flux<Tweet> localTweets = Flux.just(
new Tweet("tag1",new TweetID("text","name"),"userimage","country","place")
);
Result expected: tag2
.
Upvotes: 0
Views: 1246
Reputation: 9947
If localTweets
is a finite stream and fits into memory, then the following works:
Flux<Tweet> remoteTweets = Flux.just(
new Tweet("tag1", new TweetID("text", "name"), "userimage", "country", "place"),
new Tweet("tag2", new TweetID("text", "name"), "userimage", "country", "place")
);
Flux<Tweet> localTweets = Flux.just(
new Tweet("tag1", new TweetID("text", "name"), "userimage", "country", "place")
).cache(); // note cache operator here, to avoid mutiple subscription
remoteTweets
.filterWhen(remoteTweet -> localTweets.hasElement(remoteTweet).map(hasElement -> !hasElement))
.subscribe(System.out::println);
If the stream is finite, but not fits into memory, then you should leave the cache
operator out. That will mean localTweets
Flux will be subscribed to multiple times.
If the stream is infinite, you should apply some windowing strategy (e.g. check tweets only from last 10 minutes).
Upvotes: 2