Reputation: 21
Let's say I have an instance of ExecutorService from one of Executors static factory methods.
If I submit a Callable where RetVal is not a thread-safe, locally instantiated object from some thread, do I need to worry about RetVals' integrity when I get() it from the same thread? People say that local variables are thread-safe, but I am not sure if it applies when you're returning a locally instantiated Object and receiving it from some other thread.
Here's an example similar to my situation:
ExecutorService executor = Executors.newFixedThreadPool(5);
Future<List<String>> fut = executor.submit(() -> {
List<String> ret = new ArrayList<>();
ret.add("aasdf");
ret.add("dfls");
return ret;
});
List<String> myList = fut.get();
In the above example, I'm retrieving an ArrayList that was created in a different thread--one created by executor. I don't think above code is thread safe but I was not able to find much information regarding my specific situation.
Now I tried the above code on my computer and it actually returned the expected result 100% of the time I tried it, and I even tried with my own implementation of an ExecutorService and so far I have only got the expected results. So unless I have gotten extremely lucky I am pretty sure it works but I'm not sure how. I created a not thread-safe object in another thread and received it in another; shouldn't I have a chance to have received a partially constructed object--in my case a list that does not contain 2 strings?
Below is my custom implementation I made just to test. You can ignore the EType enum thingy.
class MyExecutor {
enum EType {
NoHolder, Holder1, Holder2
}
private ConcurrentLinkedQueue<MyFutureTask<?>> tasksQ;
private final Thread thread;
private final EType eType;
public MyExecutor(EType eType) {
eType = Objects.requireNonNull(eType);
tasksQ = new ConcurrentLinkedQueue<>();
thread = new Thread(new MyRunnable());
thread.start();
}
public <T> Future<T> submit(Callable<T> c) {
MyFutureTask<T> task = new MyFutureTask<T>(c, eType);
tasksQ.add(task);
return task;
}
class MyRunnable implements Runnable {
@Override
public void run() {
while (true) {
if (tasksQ.isEmpty()) {
try {
Thread.sleep(1);
continue;
} catch (InterruptedException ite) {
Thread.interrupted();
break;
}
}
MyFutureTask<?> task = tasksQ.poll();
try {
task.run();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
class MyFutureTask<T> implements RunnableFuture<T> {
final Callable<?> cb;
volatile Object outcome;
static final int STATE_PENDING = 1;
static final int STATE_EXECUTING = 2;
static final int STATE_DONE = 3;
final AtomicInteger atomicState = new AtomicInteger(STATE_PENDING);
final EType eType;
public MyFutureTask(Callable<?> cb, EType eType) {
cb = Objects.requireNonNull(cb);
eType = Objects.requireNonNull(eType);
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
throw new NotImplementedException();
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return atomicState.get() == STATE_DONE;
}
@SuppressWarnings("unchecked")
@Override
public T get() throws InterruptedException, ExecutionException {
while (true) {
switch (atomicState.get()) {
case STATE_PENDING:
case STATE_EXECUTING:
// Thread.sleep(1);
break;
case STATE_DONE:
return (T)outcome;
default:
throw new IllegalStateException();
}
}
}
@Override
public T get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
throw new NotImplementedException();
}
void set(T t) {
outcome = t;
}
@Override
public void run() {
if (atomicState.compareAndSet(STATE_PENDING, STATE_EXECUTING)) {
Object result;
try {
switch (eType) {
case NoHolder:
result = cb.call();
break;
case Holder1:
throw new NotImplementedException();
case Holder2:
throw new NotImplementedException();
default:
throw new IllegalStateException();
}
} catch (Exception e) {
e.printStackTrace();
result = null;
}
outcome = result;
atomicState.set(STATE_DONE);
}
}
}
}
class MyTask implements Callable<List<Integer>> {
@Override
public List<Integer> call() throws Exception {
List<Integer> ret = new ArrayList<>(100);
IntStream.range(0, 100).boxed().forEach(ret::add);
return ret;
}
}
Upvotes: 2
Views: 968
Reputation: 147164
The important thing is the happens-before relationship. From ExecutorService
API docs:
Memory consistency effects: Actions in a thread prior to the submission of a
Runnable
orCallable
task to anExecutorService
happen-before any actions taken by that task, which in turn happen-before the result is retrieved viaFuture.get()
.
So you are safe to transfer a mutable object like this. The ExecutorService
implementation transfers the object via some form of safe publication.
Obviously, don't update the object in the original thread after returning it.
If you were to communicate between threads by stashing in a shared non-volatile
field, then that would be unsafe.
Upvotes: 4
Reputation: 3300
Thread safety becomes a concern when multiple threads try to access and modify the same state simultaneously.
Note that you will not get hold of the actual result from a Future
until the task is finished (i.e. Future#get
will not return until the task is finished).
In your first example, thread safety is not an issue because the a new object (while mutable) is created by one thread (the thread created by the Executor) and retrieved from the Future
object once that thread has finished processing the task. Once the calling thread gets hold of the object, it cannot be modified by any other thread, because the creating thread no longer has access to the List.
Upvotes: 2