molok
molok

Reputation: 46

Sample all but first elements from flux in project reactor

In project reactor flux there is a sample method Flux#sample java doc. It changes flux so that it emits events only at the ends of specified periods.

Is it possible to tweak this behaviour and achieve this : on first element - emit it instantly , start sampling with delay from 2nd up to the end. Basically I want to exclude first (and only first) element from sampling so that it is emmited without initial wait.

Would it be possible to achieve using built-in operators ? If not then does anybody have an idea how to approach this problem ?

Here is a simplest example of what I want to achieve :

Flux<String> inputFlux = Flux.just("first", "second", "third").delayElements(Duration.ofMillis(400));
Flux<String> transformed = /*do some magic with input flux*/;

StepVerifier.create(transformed)
    .expectNext("first")//first should always be emmited instantly
    //second arrives 400ms after first
    //third arrives 400ms after second
    .expectNoEvent(Duration.ofSeconds(1))
    .expectNext("third")//after sample period last received element should be received 
    .verifyComplete();

Upvotes: 1

Views: 3043

Answers (2)

Simon Basl&#233;
Simon Basl&#233;

Reputation: 28301

By turning the source flux myFlux into a hot flux, you can easily achieve this:

Flux<T> myFlux;
Flux<T> sharedFlux = myFlux.publish().refCount(2);

Flux<T> first = sharedFlux.take(1);
Flux<T> sampledRest = sharedFlux.skip(1).sample(Duration.ofMillis(whatever));

return Flux.merge(first, sampledRest);

Upvotes: 3

Alexander Pankin
Alexander Pankin

Reputation: 3955

You could achieve it with Flux#sample(org.reactivestreams.Publisher<U>) method.

yourFlux.take(1)
        .mergeWith(yourFlux.sample(Flux.interval(yourInterval)
                .delaySubscription(yourFlux.take(1))))

Upvotes: 0

Related Questions