excbot
excbot

Reputation: 123

Is there a way to update the context in Reactor 3?

According to the reference guide a context added to a reactive sequence is immutable:

Use put(Object key, Object value) to store a key-value pair, returning a new Context instance.

Then I am wondering if there is a sort of "workaround" in order to update the context of a sequence, for example in replacing the old one.

To illustrate my case, consider the following example:

 @Test
public void contextGlobalTest() {
    Flux<String> chars = Flux.fromIterable(Arrays.asList("a", "b", "c", "d"));
    chars.flatMap(this::concat)
            .flatMap(m -> Mono.subscriberContext().map(ctx -> m + ctx.get("key2")))
            .subscriberContext(Context.of("key1", 1, "key2", "yy"))
            .subscribe(System.out::println);
}

private Flux<String> concat(String s) {
    Flux<String> flux = Flux.just(s + s);
    return flux.flatMap(entry -> Mono.subscriberContext()
                    .map(ctx -> entry + ctx.get("key1")))
               .subscriberContext(Context.of("key1", 2, "key2", "zz"));
}

The output is: aa2yy bb2yy cc2yy dd2yy, but I was expecting: aa2zz bb2zz cc2zz dd2zz

So is there any solution to this case?

For those who are interested, I figure out a workaround: One need to give a mutable object to the context and then mutate attribute of that object inside the context. See that post for more details

Upvotes: 0

Views: 3372

Answers (2)

Kevin Hussey
Kevin Hussey

Reputation: 1712

Since the context is immutable, you are retrieving a new context each time, so in 'concat' the visible context(key1=2) is only within that flatmap, once it leaves and goes to the outside the flatmap, only the key1=1 context is visible. If you really want to update the context, then maybe make the context mutable like

@Test
    public void contextGlobalTest() {
        MutableContext mutableContext = new MutableContext();
        Flux.fromIterable(Arrays.asList("a", "b", "c", "d"))
                .flatMap(s -> Flux.just(s + s)
                        .flatMap(entry -> Mono.subscriberContext()
                            .map(ctx -> entry + ctx.get("key1")))
                        .subscriberContext(ctx -> mutableContext.put("key1", 2).put("key2", "zz")))
                .flatMap(m -> Mono.subscriberContext().map(ctx -> m + ctx.get("key2")))
                .subscriberContext(ctx -> mutableContext.put("key1", 1).put("key2", "yy"))
                .subscribe(System.out::println);
    }

static class MutableContext implements Context {
    HashMap<Object, Object> holder = new HashMap<>();

    @Override
    public <T> T get(Object key) {
        return (T) holder.get(key);
    }

    @Override
    public boolean hasKey(Object key) {
        return holder.containsKey(key);
    }

    @Override
    public Context put(Object key, Object value) {
        holder.put(key, value);
        return this;
    }

    @Override
    public Context delete(Object key) {
        holder.remove(key);
        return this;
    }

    @Override
    public Stream<Map.Entry<Object, Object>> stream() {
        return holder.entrySet().stream();
    }
} 

Upvotes: 1

Tuomas Kiviaho
Tuomas Kiviaho

Reputation: 355

Sinks (SynchronousSink, MonoSink, FluxSink) and CoreSubscriber give you access to currentContext, but these options are not always available.

Upvotes: 1

Related Questions