Rowan A
Rowan A

Reputation: 51

Project Reactor and the Java memory model

I am trying to understand what guarantees with respect to data visibility Project reactor provides to application code. For e.g. I would expect the below code to fail but it does not after a million iterations. I am changing the state of a typical POJO on thread A and reading it back from thread B. Does Reactor guarantee POJO changes are visible across thread?

public class Main {
    public static void main(String[] args) {
        Integer result = Flux.range(1, 1_000_000)
                .map(i -> {
                    Data data = new Data();
                    data.setValue(i);
                    data.setValueThreeTimes(i);
                    data.setValueObj(i + i);
                    return data;
                })
                .parallel(250)
                .runOn(Schedulers.newParallel("par", 500))
                .map(d -> {
                    d.setValueThreeTimes(d.getValueThreeTimes() + d.getValue());
                    return d;
                })
                .sequential()
                .parallel(250)
                .runOn(Schedulers.newParallel("par", 500))
                .map(d -> {
                    d.setValueThreeTimes(d.getValueThreeTimes() + d.getValue());
                    return d;
                })
                //                .sequential()
                .map(d -> {
                    if (d.getValue() * 3 != d.getValueThreeTimes()) throw new RuntimeException("data corrupt error");
                    return d;
                })
                .reduce(() -> 0, (Integer sum, Data d) -> sum + d.getValueObj() + d.getValue())
                .sequential()
                .blockLast();
    }

    static class Data {
        private int value;
        private int valueThreeTimes;
        private Integer valueObj;

        public int getValueThreeTimes() {
            return valueThreeTimes;
        }

        public void setValueThreeTimes(int valueThreeTimes) {
            this.valueThreeTimes = valueThreeTimes;
        }

        public int getValue() {
            return value;
        }

        @Override
        public String toString() {
            return "Data{" +
                    "value=" + value +
                    ", valueObj=" + valueObj +
                    '}';
        }

        public void setValue(int value) {
            this.value = value;
        }

        public Integer getValueObj() {
            return valueObj;
        }

        public void setValueObj(Integer valueObj) {
            this.valueObj = valueObj;
        }
    }

    private static <T> T identityWithThreadLogging(T el, String operation) {
        System.out.println(operation + " -- " + el + " -- " +
                Thread.currentThread().getName());
        return el;
    }
}

Upvotes: 5

Views: 1275

Answers (1)

Simon Basl&#233;
Simon Basl&#233;

Reputation: 28301

The Reactive Streams specification enforces that for a Flux or Mono (a Publisher), onNext events must be sequential.

parallel() is a ParallelFlux, which relaxes that a bit by dividing-and-conquering: you get multiple "rails" that each individually stick to the spec, but overall do not (parallelisation between rails).

In turn, sequential() goes back to the Flux world and introduces memory barrier to guarantee the resulting sequence complies with the RS spec.

Upvotes: 3

Related Questions