Reputation: 100
I have a correlation ID stored in context (by using contextWrite) and I'd like to recover and use it in the doOnNext(), in which I implement some logging from previous data transformations.
Here's an example of what I achieved till now:
Mono.just("message")
...
.doOnNext(it -> Mono.deferContextual(ctx -> Mono.just(ctx)
.doOnNext(c -> log.info("correlation-id: "+ c.get(CORRELATION_ID)))))
.subscribe();
Is this the only way to achieve what I want? It seems a little odd to me somehow. I'm still learning and I've been reading the context section from the Reactor doc, but I couldn't find anything that approximates what I'm trying to achieve.
UPDATE 1
I managed to achieve something like that (and didn't have to Mono.just() 2x):
Mono.deferContextual(ctx ->
Mono.just("message")
...
.doOnNext(it -> log.info("correlation-id: " + ctx.get(CORRELATION_ID)))
);
Still don't know if that is a best practice, but looks way better than the previous one
UPDATE 2
Reading and digging a little deeper into the code and documentation, I discovered that Mono.deferContextual() returns a Mono (my mistake for not noticing that before).As a result, for the first scenario, flatMap() should be used instead of doOnNext(). To simulate the doOnNext() function, I'll have to refactor a little more to return the same received object on flatMap().
UPDATE 3
I think that I got to the final code with transformDeferredContextual(). According to the reactor documentation:
to access the context from the middle of an operator chain, use transformDeferredContextual(BiFunction)
Applying to my scenario:
Mono.just("message")
...
.transformDeferredContextual((originalMono, context) -> {
log.info("correlation-id: " + context.get(CORRELATION_ID)))
return originalMono;
})
.subscribe();
);
It seems a bit odd, but till now, this seems like the best approach.
Upvotes: 1
Views: 4129
Reputation: 6255
Something similar but prettier than your final approach:
Mono.just("message")
...
.transformDeferredContextual((originalMono, context) -> originalMono.doOnNext(e -> {
log.info("correlation-id: " + context.get(CORRELATION_ID));
}))
Upvotes: 3