corvax
corvax

Reputation: 1145

Prove me that PublishSubject in RxJava is not thread safe

It is being declared that PublishSubject is not thread safe in RxJava. Ok.

I'm trying to find any example, I'm trying to construct any example to emulate race condition, that leads to unwanted results. But I can't :(

Can anyone provide an example proving that PublishSubject is not thread safe?

Upvotes: 1

Views: 2280

Answers (2)

corvax
corvax

Reputation: 1145

I've found the proof. I think this example more obvious then @akarnokd provided.

    AtomicInteger counter = new AtomicInteger();

    // Thread-safe
    // SerializedSubject<Object, Object> subject = PublishSubject.create().toSerialized();

    // Not Thread Safe
    PublishSubject<Object> subject = PublishSubject.create();

    Action1<Object> print = (x) -> System.out.println(Thread.currentThread().getName() + " " + counter);

    Consumer<Integer> sleep = (s) -> {
        try {
            Thread.sleep(s);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    };

    subject
            .doOnNext(i -> counter.incrementAndGet())
            .doOnNext(i -> counter.decrementAndGet())
            .doOnNext(print)
            .filter(i -> counter.get() != 0)
            .doOnNext(i -> {
                        throw new NullPointerException("Concurrency detected");
                    }
            )
            .subscribe();

    Runnable r = () -> {
        for (int i = 0; i < 100000; i++) {
            sleep.accept(1);
            subject.onNext(i);
        }
    };

    ExecutorService pool = Executors.newFixedThreadPool(2);
    pool.execute(r);
    pool.execute(r);

Upvotes: 2

akarnokd
akarnokd

Reputation: 69997

Usually, people ask why their setup behaves unexpectedly and/or crashes and the answer is: because they call the onXXX methods on the Subject concurrently:

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;

import rx.Scheduler.Worker;
import rx.exceptions.MissingBackpressureException;
import rx.observers.AssertableSubscriber;
import rx.schedulers.Schedulers;
import rx.subjects.*;

public class PublishSubjectRaceTest {

    @Test
    public void racy() throws Exception {
        Worker worker = Schedulers.computation().createWorker();
        try {
            for (int i = 0; i < 1000; i++) {
                AtomicInteger wip = new AtomicInteger(2);

                PublishSubject<Integer> ps = PublishSubject.create();

                AssertableSubscriber<Integer> as = ps.test(1);

                CountDownLatch cdl = new CountDownLatch(1);

                worker.schedule(() -> {
                    if (wip.decrementAndGet() != 0) {
                        while (wip.get() != 0) ;
                    }
                    ps.onNext(1);

                    cdl.countDown();
                });
                if (wip.decrementAndGet() != 0) {
                    while (wip.get() != 0) ;
                }
                ps.onNext(1);

                cdl.await();

                as.assertFailure(MissingBackpressureException.class, 1);
            }
        } finally {
            worker.unsubscribe();
        }
    }

    @Test
    public void nonRacy() throws Exception {
        Worker worker = Schedulers.computation().createWorker();
        try {
            for (int i = 0; i < 1000; i++) {
                AtomicInteger wip = new AtomicInteger(2);

                Subject<Integer, Integer> ps = PublishSubject.<Integer>create()
                    .toSerialized();

                AssertableSubscriber<Integer> as = ps.test(1);

                CountDownLatch cdl = new CountDownLatch(1);

                worker.schedule(() -> {
                    if (wip.decrementAndGet() != 0) {
                        while (wip.get() != 0) ;
                    }
                    ps.onNext(1);

                    cdl.countDown();
                });
                if (wip.decrementAndGet() != 0) {
                    while (wip.get() != 0) ;
                }
                ps.onNext(1);

                cdl.await();

                as.assertFailure(MissingBackpressureException.class, 1);
            }
        } finally {
            worker.unsubscribe();
        }
    }
}

Upvotes: 5

Related Questions