Reputation: 981
I am trying to do some blocking operations (say HTTP request) in a scheduled and non-blocking manner. Let's say I have 10 requests and one request takes 3 seconds but I would like not to wait for 3 seconds but wait 1 second and send the next one. After all executions are finished I would like to gather all results in a list and return to the user.
Below, there is a prototype of my scenario (thread sleep used as blocking operation instead of HTTP req.)
public static List<Integer> getResults(List<Integer> inputs) throws InterruptedException, ExecutionException {
List<Integer> results = new LinkedList<Integer>();
Queue<Callable<Integer>> tasks = new LinkedList<Callable<Integer>>();
List<Future<Integer>> futures = new LinkedList<Future<Integer>>();
for (Integer input : inputs) {
Callable<Integer> task = new Callable<Integer>() {
public Integer call() throws InterruptedException {
Thread.sleep(3000);
return input + 1000;
}
};
tasks.add(task);
}
ExecutorService es = Executors.newCachedThreadPool();
ScheduledExecutorService ses = Executors.newScheduledThreadPool(1);
ses.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
Callable<Integer> task = tasks.poll();
if (task == null) {
ses.shutdown();
es.shutdown();
return;
}
futures.add(es.submit(task));
}
}, 0, 1000, TimeUnit.MILLISECONDS);
while(true) {
if(futures.size() == inputs.size()) {
for (Future<Integer> future : futures) {
Integer result = future.get();
results.add(result);
}
return results;
}
}
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
List<Integer> results = getResults(new LinkedList<Integer>(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)));
System.out.println(Arrays.toString(results.toArray()));
}
I am waiting in a while loop until all tasks return a proper result. But it never enters inside the breaking condition and it infinitely loops. Whenever I put an I/O operation like logger or even a breakpoint, it just break the while loop and everything becomes ok.
I am relatively new to Java concurrency and trying to understand what is happening and whether this is the correct way to do. I guess I/O operation triggers something on thread scheduler and make it check the collections' sizes.
Upvotes: 1
Views: 212
Reputation: 837
You need to synchronize your threads. You have two different threads (the main thread and the exectuor service thread) accessing the futures
list and since LinkedList
is not synchronized, these two threads see two different values of futures
.
while(true) {
synchronized(futures) {
if(futures.size() == inputs.size()) {
...
}
}
}
This happens because threads in java use the cpu cache to improve performance. So each thread could have different values of a variable until they are synchronized. This SO question has more information on this.
Also from this answer:
It's all about memory. Threads communicate through shared memory, but when there are multiple CPUs in a system, all trying to access the same memory system, then the memory system becomes a bottleneck. Therefore, the CPUs in a typical multi-CPU computer are allowed to delay, re-order, and cache memory operations in order to speed things up.
That works great when threads are not interacting with one another, but it causes problems when they actually do want to interact: If thread A stores a value into an ordinary variable, Java makes no guarantee about when (or even if) thread B will see the value change.
In order to overcome that problem when it's important, Java gives you certain means of synchronizing threads. That is, getting the threads to agree on the state of the program's memory. The volatile keyword and the synchronized keyword are two means of establishing synchronization between threads.
And finally, the futures
list does not update in your code because the main thread is continuously occupied, because of the infinte while
block. Doing any I/O operation in your while loop gives the cpu enough breathing space to update its local cache.
An infinite while loop is generally a bad idea because it is very resource intensive. Adding a small delay before the next iteration can make it a little better (though still inefficient).
Upvotes: 1