Reputation: 924
I am trying to pass subscriber Context
to the fireAndForget
method which is called inside the doOnNext
. The fireAndForget
is run also async non-blocking. How this context might be passed so the value for "key" is present? When I run the following test it passes. However, in the logs I can see that for both doOnNext
I get:
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.NoSuchElementException: Context is empty
@Test
void shouldPassContextToFireAndForget() {
final Mono<String> helloWorldMono = Mono.just("hello")
.doOnNext(this::fireAndForget)
.doOnNext(name -> Mono.deferContextual(contextView -> fireAndForget(contextView, name)).subscribe())
.flatMap(name -> Mono.deferContextual(contextView -> Mono.just(name + " " + contextView.get("key"))))
.contextWrite(Context.of("key", "world"));
StepVerifier.create(helloWorldMono)
.expectNext("hello world")
.verifyComplete();
}
private Mono<String> fireAndForget(ContextView context, String name) {
return Mono.just(name)
.flatMap(value -> Mono.deferContextual(contextView -> Mono.just(value + contextView.get("key"))))
.contextWrite(context);
}
private void fireAndForget(String name) {
Mono.just(name)
.flatMap(value -> Mono.deferContextual(contextView -> Mono.just(value + contextView.get("key"))))
.subscribe();
}
Upvotes: 2
Views: 1748
Reputation: 6255
Context
is a subscribe-time concept. There are two possible approaches.
You can expose the ContextView
at the middle of the chain using transformDeferredContextual
:
final Mono<String> helloWorldMono = Mono.just("hello")
.transformDeferredContextual((original, cntx) -> original.doOnNext(name-> fireAndForget(cntx, name).subscribe()))
.flatMap(name -> Mono.deferContextual(contextView -> Mono.just(name + " " + contextView.get("key"))))
.contextWrite(Context.of("key", "world"));
Alternatively, you could take advantage of Mono.deferContextual
in order to expose the ContextView
at the start of the chain like this:
final Mono<String> helloWorldMono = Mono.deferContextual(context ->
Mono.just("hello")
.doOnNext(name -> fireAndForget(context, name).subscribe())
.flatMap(name -> Mono.just(name + " " + context.get("key")))
).contextWrite(Context.of("key", "world"));
Upvotes: 1