forhas
forhas

Reputation: 12001

Thread pool queue with unique tasks

I'm using ThreadPoolTaskExecutor (of spring) in order to execute some tasks asynchronously.

The required task will load some object from an external DB into my system memory. I'm using max thread pool size of 10 and max queue size of 100.

Suppose that all 10 threads are occupied getting objects from my DB and a task is created, it will go to the queue. Now another task is created that should get the same object (same key in DB) from the DB, it will also go to the queue (assuming all 10 threads are still occupied).

So my queue might get full easily with duplicated tasks which will get executed in turn and I don't want this to happen.

I thought that a solution should come in the form of a unique collection which serves as the thread pool queue. Under the hood ThreadPoolTaskExecutor uses LinkedBlockingQueue which does not provide uniqueness.

I thought of a few possible solutions but none satisfies me:

This led me to try and extend LinkedBlockingQueue and override add:

public boolean add(E e)
    if(!this.contains(e)) {
        return super.add(e);
    } else {
        return false;
    }
}

But as far as I can tell this will lead to a major performance reduction since the contains method is limited by O(n) - bad idea.

What could solve my problem? I'm aiming for a good performance (in case of memory-performance trade offs I don't mind giving up memory for performance).

Upvotes: 8

Views: 4166

Answers (3)

forhas
forhas

Reputation: 12001

A solution similar to the accepted solution but based on Spring (as opposed to Guava):

Create interface RunnableWithId:

 public interface RunnableWithId extends Runnable {

    /**
     * @return A unique id for this task
     */
    String getTaskId();
}

Create another interface TaskWithIdExecutor:

import org.springframework.core.task.TaskExecutor;


public interface TaskWithIdExecutor extends TaskExecutor {

    /**
     * Executes the given task if it is not queued or already running
     *
     * @param task The task to execute
     */
    void executeIfNotQueuedOrRunningAlready(RunnableWithId task);
}

Create your custom executor UniquTaskExecutor:

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.util.Set;

/**
 * In addition to all the abilities of ThreadPoolTaskExecutor adds the ability
 * to execute a task only if it is not already running/queued using the
 * executeIfNotQueuedOrRunningAlready method.
 *
 * @see ThreadPoolTaskExecutor
 */
public class UniquTaskExecutor extends ThreadPoolTaskExecutor implements TaskWithIdExecutor {

    private Set<String> queuedTasks;

    public UniquTaskExecutor() {
        queuedTasks = Sets.newConcurrentHashSet();
    }

    @Override
    public void execute(Runnable task) {
        super.execute(task);
    }

    /**
     * @param task The task to execute
     */
    @Override
    public void executeIfNotQueuedOrRunningAlready(RunnableWithId task) {
        if (queuedTasks.add(task.getTaskId())) {
            ListenableFuture<?> res = submitListenable(task);
            res.addCallback(new ListenableFutureCallback<Object>() {
                @Override
                public void onFailure(Throwable throwable) {
                    queuedTasks.remove(task.getTaskId());
                }

                @Override
                public void onSuccess(Object o) {
                    queuedTasks.remove(task.getTaskId());
                }
            });
        }
    }
}

Use the executeIfNotQueuedOrRunningAlready method of UniquTaskExecutor to achieve uniqueness in task executions.

Upvotes: 1

marco.eig
marco.eig

Reputation: 4239

Using Guava and ListenableFuture you could do something like that (haven't tested)

Set<String> uniqueQueue = Sets.newConcurrentHashSet();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 0, TimeUnit.SECONDS, Queues.newLinkedBlockingQueue(100));
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(threadPoolExecutor);

String t1 = "abc";
if(uniqueQueue.add(t1)) {
    ListenableFuture<String> future = executorService.submit(() -> "do something with " + t1);
    Futures.addCallback(future, new FutureCallback<String>() {
        @Override
        public void onSuccess(String result) {
            uniqueQueue.remove(t1);
        }

        @Override
        public void onFailure(Throwable t) {
            uniqueQueue.remove(t1);
        }
    });
}

resulting in

  • only items that are not currently being processed or in the queue will be added to the queue (uniqueQueue)
  • items that have been processed will be removed from the uniqueQueue
  • you'll only have a maxium of 100 items in the queue

this implementation does not handle

  • Exceptions thrown by the submit() method
  • Maximum number of items in the unqiueQueue

With reference to your requirement of loading objects from a database into memory, you might want to take a look at Guava's Caches.

UPDATE:

Upvotes: 5

spudone
spudone

Reputation: 1277

If you're allowed to manage the database, I'd suggest using the database itself to prevent duplicate effort:

  • Add a lockid column to your table
  • Add a status column to your table (maybe 'new' and 'done')
  • Make sure your DB isolation level is at least READ_COMMITTED

Then try something like this, in your main thread:

Random rand = new Random();
int lockId = rand.nextInt(Integer.MAX_VALUE - 1) + 1;
String update = "UPDATE DB.Table SET lockid=" + lockId + " WHERE lockid=0 AND status='new' " // + AND your conditions + LIMIT ##
String select = "SELECT * FROM DB.Table WHERE lockid=" + lockId;
// now execute those sql statements with QueryRunner or whatever you use in-house

The rows that return from the select are what you add to the queue.

Then, you have a class that implements Runnable that processes those rows, by retrieving them from the queue. Once it processes a row, you do another SQL update (inside the Runnable) to set the lockId back to zero and set the status to 'done'.

This has the advantage of working even if you have multiple machines each with their own queue.

Upvotes: 0

Related Questions