Reputation: 46
In project reactor flux there is a sample method Flux#sample java doc. It changes flux so that it emits events only at the ends of specified periods.
Is it possible to tweak this behaviour and achieve this : on first element - emit it instantly , start sampling with delay from 2nd up to the end. Basically I want to exclude first (and only first) element from sampling so that it is emmited without initial wait.
Would it be possible to achieve using built-in operators ? If not then does anybody have an idea how to approach this problem ?
Here is a simplest example of what I want to achieve :
Flux<String> inputFlux = Flux.just("first", "second", "third").delayElements(Duration.ofMillis(400));
Flux<String> transformed = /*do some magic with input flux*/;
StepVerifier.create(transformed)
.expectNext("first")//first should always be emmited instantly
//second arrives 400ms after first
//third arrives 400ms after second
.expectNoEvent(Duration.ofSeconds(1))
.expectNext("third")//after sample period last received element should be received
.verifyComplete();
Upvotes: 1
Views: 3043
Reputation: 28301
By turning the source flux myFlux
into a hot flux, you can easily achieve this:
Flux<T> myFlux;
Flux<T> sharedFlux = myFlux.publish().refCount(2);
Flux<T> first = sharedFlux.take(1);
Flux<T> sampledRest = sharedFlux.skip(1).sample(Duration.ofMillis(whatever));
return Flux.merge(first, sampledRest);
Upvotes: 3
Reputation: 3955
You could achieve it with Flux#sample(org.reactivestreams.Publisher<U>)
method.
yourFlux.take(1)
.mergeWith(yourFlux.sample(Flux.interval(yourInterval)
.delaySubscription(yourFlux.take(1))))
Upvotes: 0