Reputation: 123
According to the reference guide a context added to a reactive sequence is immutable:
Use put(Object key, Object value) to store a key-value pair, returning a new Context instance.
Then I am wondering if there is a sort of "workaround" in order to update the context of a sequence, for example in replacing the old one.
To illustrate my case, consider the following example:
@Test
public void contextGlobalTest() {
Flux<String> chars = Flux.fromIterable(Arrays.asList("a", "b", "c", "d"));
chars.flatMap(this::concat)
.flatMap(m -> Mono.subscriberContext().map(ctx -> m + ctx.get("key2")))
.subscriberContext(Context.of("key1", 1, "key2", "yy"))
.subscribe(System.out::println);
}
private Flux<String> concat(String s) {
Flux<String> flux = Flux.just(s + s);
return flux.flatMap(entry -> Mono.subscriberContext()
.map(ctx -> entry + ctx.get("key1")))
.subscriberContext(Context.of("key1", 2, "key2", "zz"));
}
The output is: aa2yy bb2yy cc2yy dd2yy, but I was expecting: aa2zz bb2zz cc2zz dd2zz
So is there any solution to this case?
For those who are interested, I figure out a workaround: One need to give a mutable object to the context and then mutate attribute of that object inside the context. See that post for more details
Upvotes: 0
Views: 3372
Reputation: 1712
Since the context is immutable, you are retrieving a new context each time, so in 'concat' the visible context(key1=2) is only within that flatmap, once it leaves and goes to the outside the flatmap, only the key1=1 context is visible. If you really want to update the context, then maybe make the context mutable like
@Test
public void contextGlobalTest() {
MutableContext mutableContext = new MutableContext();
Flux.fromIterable(Arrays.asList("a", "b", "c", "d"))
.flatMap(s -> Flux.just(s + s)
.flatMap(entry -> Mono.subscriberContext()
.map(ctx -> entry + ctx.get("key1")))
.subscriberContext(ctx -> mutableContext.put("key1", 2).put("key2", "zz")))
.flatMap(m -> Mono.subscriberContext().map(ctx -> m + ctx.get("key2")))
.subscriberContext(ctx -> mutableContext.put("key1", 1).put("key2", "yy"))
.subscribe(System.out::println);
}
static class MutableContext implements Context {
HashMap<Object, Object> holder = new HashMap<>();
@Override
public <T> T get(Object key) {
return (T) holder.get(key);
}
@Override
public boolean hasKey(Object key) {
return holder.containsKey(key);
}
@Override
public Context put(Object key, Object value) {
holder.put(key, value);
return this;
}
@Override
public Context delete(Object key) {
holder.remove(key);
return this;
}
@Override
public Stream<Map.Entry<Object, Object>> stream() {
return holder.entrySet().stream();
}
}
Upvotes: 1
Reputation: 355
Sinks (SynchronousSink, MonoSink, FluxSink) and CoreSubscriber give you access to currentContext, but these options are not always available.
Upvotes: 1