msl
msl

Reputation: 21

ExecutorService thread safety

Let's say I have an instance of ExecutorService from one of Executors static factory methods.

If I submit a Callable where RetVal is not a thread-safe, locally instantiated object from some thread, do I need to worry about RetVals' integrity when I get() it from the same thread? People say that local variables are thread-safe, but I am not sure if it applies when you're returning a locally instantiated Object and receiving it from some other thread.

Here's an example similar to my situation:

ExecutorService executor = Executors.newFixedThreadPool(5);
Future<List<String>> fut = executor.submit(() -> {
    List<String> ret = new ArrayList<>();
    ret.add("aasdf");
    ret.add("dfls");
    return ret;
});

List<String> myList = fut.get();

In the above example, I'm retrieving an ArrayList that was created in a different thread--one created by executor. I don't think above code is thread safe but I was not able to find much information regarding my specific situation.

Now I tried the above code on my computer and it actually returned the expected result 100% of the time I tried it, and I even tried with my own implementation of an ExecutorService and so far I have only got the expected results. So unless I have gotten extremely lucky I am pretty sure it works but I'm not sure how. I created a not thread-safe object in another thread and received it in another; shouldn't I have a chance to have received a partially constructed object--in my case a list that does not contain 2 strings?

Below is my custom implementation I made just to test. You can ignore the EType enum thingy.

class MyExecutor {

    enum EType {
        NoHolder, Holder1, Holder2
    }

    private ConcurrentLinkedQueue<MyFutureTask<?>> tasksQ;
    private final Thread thread;

    private final EType eType;

    public MyExecutor(EType eType) {
        eType = Objects.requireNonNull(eType);

        tasksQ = new ConcurrentLinkedQueue<>();
        thread = new Thread(new MyRunnable());
        thread.start();
    }

    public <T> Future<T> submit(Callable<T> c) {
        MyFutureTask<T> task = new MyFutureTask<T>(c, eType);
        tasksQ.add(task);
        return task;
    }

    class MyRunnable implements Runnable {
        @Override
        public void run() {
            while (true) {
                if (tasksQ.isEmpty()) {
                    try {
                        Thread.sleep(1);
                        continue;
                    } catch (InterruptedException ite) {
                        Thread.interrupted();
                        break;
                    }
                }

                MyFutureTask<?> task = tasksQ.poll();
                try {
                    task.run();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    class MyFutureTask<T> implements RunnableFuture<T> {

        final Callable<?> cb;
        volatile Object outcome;

        static final int STATE_PENDING = 1;
        static final int STATE_EXECUTING = 2;
        static final int STATE_DONE = 3;

        final AtomicInteger atomicState = new AtomicInteger(STATE_PENDING);

        final EType eType;

        public MyFutureTask(Callable<?> cb, EType eType) {
            cb = Objects.requireNonNull(cb);
            eType = Objects.requireNonNull(eType);
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            throw new NotImplementedException();
        }

        @Override
        public boolean isCancelled() {
            return false;
        }

        @Override
        public boolean isDone() {
            return atomicState.get() == STATE_DONE;
        }

        @SuppressWarnings("unchecked")
        @Override
        public T get() throws InterruptedException, ExecutionException {
            while (true) {
                switch (atomicState.get()) {
                case STATE_PENDING:
                case STATE_EXECUTING:
//                      Thread.sleep(1);
                    break;
                case STATE_DONE:
                    return (T)outcome;
                default:
                    throw new IllegalStateException();
                }
            }
        }

        @Override
        public T get(long timeout, TimeUnit unit)
                throws InterruptedException, ExecutionException, TimeoutException {
            throw new NotImplementedException();
        }

        void set(T t) {
            outcome = t;
        }

        @Override
        public void run() {
            if (atomicState.compareAndSet(STATE_PENDING, STATE_EXECUTING)) {
                Object result;
                try {
                    switch (eType) {
                    case NoHolder:
                        result = cb.call();
                        break;
                    case Holder1:
                        throw new NotImplementedException();
                    case Holder2:
                        throw new NotImplementedException();
                    default:
                        throw new IllegalStateException();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    result = null;
                }

                outcome = result;
                atomicState.set(STATE_DONE);
            }
        }
    }
}

class MyTask implements Callable<List<Integer>> {
    @Override
    public List<Integer> call() throws Exception {
        List<Integer> ret = new ArrayList<>(100);
        IntStream.range(0, 100).boxed().forEach(ret::add);
        return ret;
    }
}

Upvotes: 2

Views: 968

Answers (2)

Tom Hawtin - tackline
Tom Hawtin - tackline

Reputation: 147164

The important thing is the happens-before relationship. From ExecutorService API docs:

Memory consistency effects: Actions in a thread prior to the submission of a Runnable or Callable task to an ExecutorService happen-before any actions taken by that task, which in turn happen-before the result is retrieved via Future.get().

So you are safe to transfer a mutable object like this. The ExecutorService implementation transfers the object via some form of safe publication.

Obviously, don't update the object in the original thread after returning it.

If you were to communicate between threads by stashing in a shared non-volatile field, then that would be unsafe.

Upvotes: 4

marthursson
marthursson

Reputation: 3300

Thread safety becomes a concern when multiple threads try to access and modify the same state simultaneously.

Note that you will not get hold of the actual result from a Future until the task is finished (i.e. Future#get will not return until the task is finished).

In your first example, thread safety is not an issue because the a new object (while mutable) is created by one thread (the thread created by the Executor) and retrieved from the Future object once that thread has finished processing the task. Once the calling thread gets hold of the object, it cannot be modified by any other thread, because the creating thread no longer has access to the List.

Upvotes: 2

Related Questions