Reputation: 12001
I'm using ThreadPoolTaskExecutor (of spring) in order to execute some tasks asynchronously.
The required task will load some object from an external DB into my system memory. I'm using max thread pool size of 10 and max queue size of 100.
Suppose that all 10 threads are occupied getting objects from my DB and a task is created, it will go to the queue. Now another task is created that should get the same object (same key in DB) from the DB, it will also go to the queue (assuming all 10 threads are still occupied).
So my queue might get full easily with duplicated tasks which will get executed in turn and I don't want this to happen.
I thought that a solution should come in the form of a unique collection which serves as the thread pool queue. Under the hood ThreadPoolTaskExecutor uses LinkedBlockingQueue which does not provide uniqueness.
I thought of a few possible solutions but none satisfies me:
This led me to try and extend LinkedBlockingQueue and override add:
public boolean add(E e)
if(!this.contains(e)) {
return super.add(e);
} else {
return false;
}
}
But as far as I can tell this will lead to a major performance reduction since the contains
method is limited by O(n) - bad idea.
What could solve my problem? I'm aiming for a good performance (in case of memory-performance trade offs I don't mind giving up memory for performance).
Upvotes: 8
Views: 4166
Reputation: 12001
A solution similar to the accepted solution but based on Spring (as opposed to Guava):
Create interface RunnableWithId:
public interface RunnableWithId extends Runnable {
/**
* @return A unique id for this task
*/
String getTaskId();
}
Create another interface TaskWithIdExecutor:
import org.springframework.core.task.TaskExecutor;
public interface TaskWithIdExecutor extends TaskExecutor {
/**
* Executes the given task if it is not queued or already running
*
* @param task The task to execute
*/
void executeIfNotQueuedOrRunningAlready(RunnableWithId task);
}
Create your custom executor UniquTaskExecutor:
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import java.util.Set;
/**
* In addition to all the abilities of ThreadPoolTaskExecutor adds the ability
* to execute a task only if it is not already running/queued using the
* executeIfNotQueuedOrRunningAlready method.
*
* @see ThreadPoolTaskExecutor
*/
public class UniquTaskExecutor extends ThreadPoolTaskExecutor implements TaskWithIdExecutor {
private Set<String> queuedTasks;
public UniquTaskExecutor() {
queuedTasks = Sets.newConcurrentHashSet();
}
@Override
public void execute(Runnable task) {
super.execute(task);
}
/**
* @param task The task to execute
*/
@Override
public void executeIfNotQueuedOrRunningAlready(RunnableWithId task) {
if (queuedTasks.add(task.getTaskId())) {
ListenableFuture<?> res = submitListenable(task);
res.addCallback(new ListenableFutureCallback<Object>() {
@Override
public void onFailure(Throwable throwable) {
queuedTasks.remove(task.getTaskId());
}
@Override
public void onSuccess(Object o) {
queuedTasks.remove(task.getTaskId());
}
});
}
}
}
Use the executeIfNotQueuedOrRunningAlready method of UniquTaskExecutor to achieve uniqueness in task executions.
Upvotes: 1
Reputation: 4239
Using Guava and ListenableFuture you could do something like that (haven't tested)
Set<String> uniqueQueue = Sets.newConcurrentHashSet();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 0, TimeUnit.SECONDS, Queues.newLinkedBlockingQueue(100));
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(threadPoolExecutor);
String t1 = "abc";
if(uniqueQueue.add(t1)) {
ListenableFuture<String> future = executorService.submit(() -> "do something with " + t1);
Futures.addCallback(future, new FutureCallback<String>() {
@Override
public void onSuccess(String result) {
uniqueQueue.remove(t1);
}
@Override
public void onFailure(Throwable t) {
uniqueQueue.remove(t1);
}
});
}
resulting in
uniqueQueue
)uniqueQueue
this implementation does not handle
Exceptions
thrown by the submit()
methodunqiueQueue
With reference to your requirement of loading objects from a database into memory, you might want to take a look at Guava's Caches.
UPDATE:
Upvotes: 5
Reputation: 1277
If you're allowed to manage the database, I'd suggest using the database itself to prevent duplicate effort:
Then try something like this, in your main thread:
Random rand = new Random();
int lockId = rand.nextInt(Integer.MAX_VALUE - 1) + 1;
String update = "UPDATE DB.Table SET lockid=" + lockId + " WHERE lockid=0 AND status='new' " // + AND your conditions + LIMIT ##
String select = "SELECT * FROM DB.Table WHERE lockid=" + lockId;
// now execute those sql statements with QueryRunner or whatever you use in-house
The rows that return from the select are what you add to the queue.
Then, you have a class that implements Runnable that processes those rows, by retrieving them from the queue. Once it processes a row, you do another SQL update (inside the Runnable) to set the lockId back to zero and set the status to 'done'.
This has the advantage of working even if you have multiple machines each with their own queue.
Upvotes: 0