dentex
dentex

Reputation: 3263

AsyncTask on Executor and PriorityBlockingQueue implementation issue

On the wave of this SO question and with many hints from another one, I'm trying to implement an AsyncTask variant with tasks that can be prioritized.

In my CustomAsyncTask class I have:

public abstract class CustomAsyncTask<Params, Progress, Result> {

    private static int CORE_POOL_SIZE = 1;
    private static int MAXIMUM_POOL_SIZE = 1;

    private static final int KEEP_ALIVE = 1;

    private static final ThreadFactory sThreadFactory = new ThreadFactory() {
        private final AtomicInteger mCount = new AtomicInteger(1);

        public Thread newThread(Runnable r) {
            return new Thread(r, "CustomAsyncTask #" + mCount.getAndIncrement());
        }
    };

    private static final BlockingQueue<DownloadTask> pPoolWorkQueue =
            new PriorityBlockingQueue<DownloadTask>(10, new DownloadTasksComparator());

    @SuppressWarnings({ "unchecked", "rawtypes" })
    public static Executor PRIORITY_THREAD_POOL_EXECUTOR
            = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE,
                    TimeUnit.SECONDS, (PriorityBlockingQueue) pPoolWorkQueue, sThreadFactory);

   //...
}

The comparator:

public class DownloadTasksComparator implements Comparator<DownloadTask> {

    @Override
    public int compare(DownloadTask arg0, DownloadTask arg1) {
        int res;

        if (arg0 == null && arg1 == null) {
            res = 0;
        } else if (arg0 == null) {
            res = -1;
        } else if (arg1 == null) {
            res = 1;
        }

        res = arg0.getPriority() - arg1.getPriority();

        return res;
    }
}

In the DownloadTask class extending CustomAsyncTask I have a priority Integer field and a getPriority() method.

I'm calling the tasks execution as:

DownloadTask dt = new DownloadTask(..., PRIORITY_NORMAL, ...);
dt.executeOnExecutor(CustomAsyncTask.PRIORITY_THREAD_POOL_EXECUTOR);

This works: if the pool sizes are 1, the downloads get executed one by one; if pool size is 2, etc.

note: priority Integers have arbitrary values:

public static final int PRIORITY_HIGH = 10;
public static final int PRIORITY_NORMAL = 1;

But if I call the tasks as:

DownloadTask dt = new DownloadTask(..., PRIORITY_HIGH, ...);
dt.executeOnExecutor(CustomAsyncTask.PRIORITY_THREAD_POOL_EXECUTOR);

I have a java.lang.ClassCastException: my.pkg.name.CustomAsyncTask$3 cannot be cast to my.pkg.name.DownloadTask

and then

at my.pkg.name.DownloadTasksComparator.compare(DownloadTasksComparator.java:1)
at java.util.concurrent.PriorityBlockingQueue.siftUpUsingComparator(PriorityBlockingQueue.java:334)
at java.util.concurrent.PriorityBlockingQueue.offer(PriorityBlockingQueue.java:447)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1295)
at my.pkg.name.CustomAsyncTask.executeOnExecutor(CustomAsyncTask.java:494)
at my.pkg.name.GetDownloadTaskListener$1.finishDownload(GetDownloadTaskListener.java:180)
at my.pkg.name.DownloadTask.onPostExecute(DownloadTask.java:330)
at my.pkg.name.DownloadTask.onPostExecute(DownloadTask.java:1)
at my.pkg.name.CustomAsyncTask.finish(CustomAsyncTask.java:536)
at my.pkg.name.CustomAsyncTask.access$0(CustomAsyncTask.java:532)
at my.pkg.name.CustomAsyncTask$InternalHandler.handleMessage(CustomAsyncTask.java:549)
at android.os.Handler.dispatchMessage(Handler.java:99)
at android.os.Looper.loop(Looper.java:137)
at android.app.ActivityThread.main(ActivityThread.java:4745)
at java.lang.reflect.Method.invokeNative(Native Method)
at java.lang.reflect.Method.invoke(Method.java:511)
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:786)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:553)
at dalvik.system.NativeStart.main(Native Method)

all from AndroidRuntime

I really don't have a clue...

EDIT: At this point, I've wrapped a small Eclipse project that implements things exactly the same way of the bigger application and suffers from the same issue. It borrows CustomAsyncTaskComparator and CustomAsyncTask verbatim. No visual feedback is given. The app's progress is just some LogCat output. But it gives the idea. When more than two tasks are enqueued, the app FCs.

https://www.dropbox.com/s/lrg4kscgw3f1xwr/ConcurrentTest.tar.gz

Upvotes: 0

Views: 1745

Answers (3)

handhand
handhand

Reputation: 756

The problem is the PriorityBlockingQueue<CustomAsyncTask> you use to construct the ThreadPoolExecutor. ThreadPoolExecutor should accept BlockingQueue<Runnable>. ThreadPoolExecutor uses this queue to queue the runnables (runnables are submitted by calling execute(Runnable r)).

In AsyncTask's case, it calls Executor.execute(Runnable r) with a FutureTask object, so it's this FutureTask object being queued by the executor, not AsyncTask itself. So when the comparator tries to cast runnable to DownloadTask, the exception is thrown. I guess my.pkg.name.CustomAsyncTask$3 may be an inner runnable class.

Here is part of the source code of AsyncTask:

public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
        Params... params) {
    if (mStatus != Status.PENDING) {
        switch (mStatus) {
            case RUNNING:
                throw new IllegalStateException("Cannot execute task:"
                        + " the task is already running.");
            case FINISHED:
                throw new IllegalStateException("Cannot execute task:"
                        + " the task has already been executed "
                        + "(a task can be executed only once)");
        }
    }

    mStatus = Status.RUNNING;

    onPreExecute();

    mWorker.mParams = params;
    exec.execute(mFuture);

    return this;
}

If you're trying to run AsyncTask with priority using this PriorityBlockingQueue approach, you should subclass the runnable object passed to the execute method, making it with a priority. You are using a custom asynctask, so you have full control of the variables.

To those trying to run the original AsyncTask with priority, i don't think there are much can be done. AsyncTask "execute" a private member variable mFuture, your hands are tied.

I am currently working on a similar problem, but in my case the "priority" is the timestamp, and i can set it in the runnable implementation, so it's kind of unrestricted by AsyncTask.

This is my code, just in case it might help someone. Sorry for my poor English!

/* Executor used by AsyncTask. Use it with executeOnExecutor()*/
class PriorityExecutor extends ThreadPoolExecutor{

    //workQueue is a instance of PriorityBlockingQueue<Runnable>
    public PriorityExecutor(int corePoolSize, int maximumPoolSize,
            long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory);

    }

    //wrap the runnable passed in.
    @Override
    public void execute(Runnable command) {
        super.execute(new PriorityRunnableWrapper(command));
    }
}

//a wrapper class
class PriorityRunnableWrapper implements Runnable ,Comparable<PriorityRunnableWrapper>{

    long addedTime;//the "priority"
    Runnable r;

    public PriorityRunnableWrapper(Runnable r){
        this.r = r;
        addedTime = System.nanoTime();//in my case the timestamp is the priority
    }

    @Override
    public void run() {
        r.run();
    }

    @Override
    public int compareTo(PriorityRunnableWrapper another) {
        if(addedTime == another.addedTime)return 0;
        return addedTime - another.addedTime > 0 ? -1 : 1;
    }

}

Upvotes: 0

corsair992
corsair992

Reputation: 3070

As you may have noticed while looking through the AsyncTask implementation, it internally uses a FutureTask to handle the background task, and that is what gets handed on to the Executor and potentially queued on it's work queue. Since you are already deriving your own implementation, you could replace the FutureTask with a custom derivative that holds a reference to the AsyncTask, to be accessed from your Comparator implementation.

Also, instead of replacing the default static Executor of your custom AsyncTask derivative, you should instead use the executeOnExecutor() method in your subclasses so that it can be used in a generic manner.

Upvotes: 1

CommonsWare
CommonsWare

Reputation: 1006869

CustomAsyncTask, like AsyncTask, uses a static queue. Everything that goes on that queue, at present, will run through your DownloadTasksComparator. However, DownloadTasksComparator only works with DownloadTask. So long as you only use DownloadTask, and not other subclasses of CustomAsyncTask, you will be fine. However, apparently you have some other anonymous inner class extending CustomAsyncTask, and that's not a DownloadTask.

Make CustomAsyncTask be abstract, with a getPriority() method as an abstract method. Rename DownloadTasksComparator to CustomAsyncTaskComparator and have it compare CustomAsyncTask instances. Then, your other subclasses of CustomAsyncTask would need to implement their own getPriority() methods, to enable them to be sorted along with the DownloadTask instances in your work queue.

Upvotes: 2

Related Questions