greengold
greengold

Reputation: 1333

Spring-WebFlux Flux fails with Context

I want to use Context in my Flux pipe to bypass filtering.

Here's what I have:

public Flux<Bar> realtime(Flux<OHLCIntf> ohlcIntfFlux) {
        return Flux.zip(
                ohlcIntfFlux,
                ohlcIntfFlux.skip(1),
                Mono.subscriberContext().map(c -> c.getOrDefault("isRealtime", false))
        )
                .filter(l ->
                        l.getT3() ||
                        (!l.getT2().getEndTimeStr().equals(l.getT1().getEndTimeStr())))
                .map(Tuple2::getT1)
                .log()
                .map(this::
}

which is input to this this:

    public void setRealtime(Flux<Bar> input) {
        Flux.zip(input, Mono.subscriberContext())
                .doOnComplete(() -> {
...
   })
                .doOnNext(t -> {
...
    })
    .subscribe()
}

I can tell my code in ... is not failing, I can even access the Context map, but when the first iteration completes, I get:

onContextUpdate(Context1{reactor.onNextError.localStrategy=reactor.core.publisher.OnNextFailureStrategy$ResumeStrategy@35d5ac51})

and subscriber disconnects.

So my question is whether I am using it right and what can be an issue here?

EDIT:

I have tried to repeat() the Mono.subscriberContext() when I'm using value out of it:

        return Flux.zip(
                ohlcIntfFlux,
                ohlcIntfFlux.skip(1),
                Mono.subscriberContext()
                        .map(c -> c.getOrDefault("isRealtime", new AtomicBoolean())).repeat()
        )
                .filter(l ->
                        l.getT3().get() ||
                                (!l.getT2().getEndTime().isEqual(l.getT1().getEndTime())))
                .map(Tuple2::getT1)

and set the AtomicBoolean to the context on the subscriber end and just change the value inside this variable reference, when I need the signal on the upstream, but it doesn't change at all:

        input
                .onErrorContinue((throwable, o) -> throwable.getMessage())
                .doOnComplete(() -> {
                    System.out.println("Number of trades for the strategy: " + tradingRecord.getTradeCount());
                    // Analysis
                    System.out.println("Total profit for the strategy: " + new TotalProfitCriterion().calculate(timeSeries, tradingRecord));
                })
                .doOnNext(this::defaultRealtimeEvaluator)
                .subscriberContext(Context.of("isRealtime", isRealtimeAtomic))
                .subscribe();

at least with repeat the Flux doesn't disconnect but the value I'm getting out of it is not being updated. No other clues I have.

Spring-webflux: 2.1.3.RELEASE

Upvotes: 2

Views: 2036

Answers (1)

greengold
greengold

Reputation: 1333

this works:

        input
                .onErrorContinue((throwable, o) -> throwable.getMessage())
                .doOnComplete(() -> { ... }
                .flatMap(bar -> Mono.subscriberContext()
                        .map(c -> Tuples.of(bar, c)))
                .doOnNext(this::defaultRealtimeEvaluator)
                .subscriberContext(Context.of("isRealtime", new AtomicBoolean()))
                .subscribe();

so the point is to set AtomicBoolean in my case as the cotnext and then extract this variable out of the context if you want to change it's value. the same on the upstream flux.

Upvotes: 1

Related Questions