Geralt_Encore
Geralt_Encore

Reputation: 3771

RxJava SerializedObserver implementation

While reading this article by David Karnok about RxJava internals I faced with an example of implementation of something close to RxJava's SerializedObserver class. Here is the code:

class ValueListEmitterLoop<T> {
    List<T> queue;                           
    boolean emitting;
    Consumer<? super T> consumer;

    public void emit(T value) {
        synchronized (this) {
            if (emitting) {
                List<T> q = queue;
                if (q == null) {
                    q = new ArrayList<>();   
                    queue = q;
                }
                q.add(value);
                return;
            }
            emitting = true;
        }
        consumer.accept(value);              
        for (;;) {
             List<T> q;
             synchronized (this) {           
                 q = queue;
                 if (q == null) {            
                     emitting = false;
                     return;
                 }
                 queue = null;               
             }
             q.forEach(consumer);            
        }        
    }
}

So the question is why internal variable q was introduced in the first synchronized block? I clearly see the reasoning behind it in the second synchronized block. Are there any reasons that I am missing to not just use:

if (queue == null) {
    queue = new ArrayList<>();
}
queue.add(value);

Upvotes: 2

Views: 102

Answers (1)

akarnokd
akarnokd

Reputation: 69997

I find it a good practice to read fields into local variables especially if they are used multiple times and some volatile/synchronized access is nearby.

For example, the following is a common pattern:

volatile boolean cancelled;

final Queue<T> queue;

final Subscriber<? super T> actual;

void drain() {
    Subscriber<? super T> a = actual;
    Queue<T> q = queue;

    for (;;) {
        if (cancelled) {
            return;
        }

        T v = q.poll();

        if (v == null) {
             a.onComplete();
             return;
        }

        a.onNext(v);
    }
}

If a and q were field accesses, the processor/JVM would have to read them back from memory all the time because of the volatile access of cancelled and the similar atomics inside poll().

Upvotes: 2

Related Questions