gvasquez
gvasquez

Reputation: 2017

Handling priority in ThreadPools

Currently, we have 2 thread pools in one of our apps:

  1. The first one is the one used to handle scheduled tasks
  2. The second deals with parallel processing of each scheduled task being run

The need to set up two different pools came from the following thinking: if several scheduled tasks are queued up in the main (first) pool and it triggers it's subtasks (parallel processing) in the same pool this would result in a race condition as would also get queued "behind" the other scheduled tasks, so nothing would actually end and deadlock would occur.

What if subtasks have higher priority than scheduled tasks? Would they "jump" the queue and pause scheduled tasks in order to finish? Or that won't happen? Is there some way to force such behavior? Or tasks can't be paused when an ThreadPoolExecutor is already running them?

Pool 1 is defined in Spring's application context XML config file as:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:task="http://www.springframework.org/schema/task"  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xmlns:context="http://www.springframework.org/schema/context"  xmlns:aop="http://www.springframework.org/schema/aop"
xsi:schemaLocation="http://www.springframework.org/schema/beans
 http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
 http://www.springframework.org/schema/aop
 http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
 http://www.springframework.org/schema/context
 http://www.springframework.org/schema/context/spring-context-3.0.xsd
 http://www.springframework.org/schema/task
 http://www.springframework.org/schema/task/spring-task-3.0.xsd">

    <context:annotation-config />
    <context:component-scan base-package="cl.waypoint.mailer.reportes" />
    <task:annotation-driven scheduler="myScheduler" />
    <task:scheduler id="myScheduler" pool-size="2" />
    <aop:aspectj-autoproxy />


</beans>

Pool 2 is defined in code as follows:

public static ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 30, TimeUnit.SECONDS,
        new LinkedBlockingDeque<Runnable>(), new ThreadFactory() {
            final AtomicLong count = new AtomicLong(0);
            private String namePreffix = "TempAndDoor";

            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setDaemon(false);
                t.setPriority(Thread.NORM_PRIORITY);
                t.setName(MessageFormat.format("{0}-{1}", namePreffix, count.getAndIncrement()));
                return t;
            }
        });

Upvotes: 0

Views: 3012

Answers (2)

Subhadeep Ray
Subhadeep Ray

Reputation: 969

Following piece of code may work for you.

com.job.Job

package com.job;

import java.util.concurrent.CountDownLatch;

public class Job implements Runnable {

    public enum JobPriority {
        HIGH, MEDIUM, LOW
    }

    private String jobName;
    private JobPriority jobPriority;
    private CountDownLatch cdl;

    public Job(String jobName, JobPriority jobPriority) {
        super();
        this.jobName = jobName;
        this.jobPriority = jobPriority;
    }

    @Override
    public void run() {
        try {
            System.out.println("Job:" + jobName + " Priority:" + jobPriority);
            Thread.sleep(3000);

        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally {
            cdl.countDown();
        }
    }

    public String getJobName() {
        return jobName;
    }

    public void setJobName(String jobName) {
        this.jobName = jobName;
    }

    public JobPriority getJobPriority() {
        return jobPriority;
    }

    public void setJobPriority(JobPriority jobPriority) {
        this.jobPriority = jobPriority;
    }

    public void initCdl(CountDownLatch countDownLatch) {
        this.cdl = countDownLatch;
    }

    // standard setters and getters

}

com.scheduler.PriorityJobScheduler

package com.scheduler;

import java.util.Comparator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;

import com.job.Job;

public class PriorityJobScheduler {

    private ExecutorService priorityJobPoolExecutor;
    private ExecutorService priorityJobScheduler = Executors.newSingleThreadExecutor();
    private PriorityBlockingQueue<Job> priorityQueue;
    private CountDownLatch countDownLatch;
    private int jobCount = 0;

    public PriorityJobScheduler(Integer poolSize, Integer queueSize) {
        priorityJobPoolExecutor = Executors.newFixedThreadPool(poolSize);
        countDownLatch = new CountDownLatch(poolSize);
        priorityQueue = new PriorityBlockingQueue<Job>(queueSize, Comparator.comparing(Job::getJobPriority));
        priorityJobScheduler.execute(() -> {
            while (true) {
                try {

                    Job j = priorityQueue.take();

                    j.initCdl(countDownLatch);

                    priorityJobPoolExecutor.execute(j);

                    jobCount++;

                    if (jobCount >= poolSize) {
                      countDownLatch.await();
                      jobCount = 0 ;
                      countDownLatch = new CountDownLatch(poolSize);
                }

                } catch (InterruptedException e) {
                    // exception needs special handling
                    break;
                }
            }
        });
    }

    public void scheduleJob(Job job) {
        priorityQueue.add(job);
    }

    public void cleanUp() {

        priorityJobScheduler.shutdown();
    }

}

Test

import java.util.ArrayList;
import java.util.List;

import com.job.Job;
import com.job.Job.JobPriority;
import com.scheduler.PriorityJobScheduler;

public class Test {

    private static int POOL_SIZE = 3;
    private static int QUEUE_SIZE = 10000;

    public static void main(String[] args) {

        
        PriorityJobScheduler pjs = new PriorityJobScheduler(POOL_SIZE, QUEUE_SIZE);

        for (int i = 0; i < 100; i++) {
            Job job1 = new Job("Job" + i + "low", JobPriority.LOW);
            Job job2 = new Job("Job" + i + "medium", JobPriority.MEDIUM);
            Job job3 = new Job("Job" + i + "high", JobPriority.HIGH);

            if (i < 30)
                pjs.scheduleJob(job1);
            else if (i < 60)
                pjs.scheduleJob(job2);
            else
                pjs.scheduleJob(job3);

            try {
                Thread.sleep(5);
            } catch (InterruptedException e) { // TODO
                e.printStackTrace();
            }

        }

        pjs.cleanUp();

    }

}

Source : https://www.baeldung.com/java-priority-job-schedule

The original example at Baeldung, doesn't have the usage of CountDownLatch in this code-piece. You may also go through the tutorial at Baeldung website. Please feel free to ask in the comment section, if you have any queries regarding the usage of CountDownLatch in this scenario.

Upvotes: 1

Luis Ramirez-Monterosa
Luis Ramirez-Monterosa

Reputation: 2242

If I understand correctly myScheduler (pool1) is used to schedule tasks and once time kicks in it submits tasks to executor (pool2).

given the question

What if subtasks have higher priority than scheduled tasks?

is not clear how you differentiate between them or what you really mean but what I understand is that subtasks are the scheduled tasks.

I think you should have some code like:

@Scheduled(cron="*/5 * * * * MON-FRI")
public void doSomething() {
    executor.submit(aTaskThatRunsInParallelOrWhatYouCallASubTask);
}

where executor is the static object you are creating in. (shouldn't it be all caps and final ;))

as per its long name, what you are submitting to executor are "sub tasks" or the tasks the executor wakes up to submit. In this case your myScheduler will wake up always in time since what executes is non-blocking.

On the other hand, since you have a LinkedBlockingDeque which means executes in order, your executor might get backed up.

another Question:

Would they "jump" the queue and pause scheduled tasks in order to finish?

they won't jump, scheduler kicks in, submits new tasks and goes dormant again. Queue starts getting filled

next question:

Is there some way to force such behavior? Or tasks can't be paused when an ThreadPoolExecutor is already running them?

you could cancel the task all together, but you are going to keep track of all tasks submitted

you could catch a future

Future aFuture = executor.submit(aTaskThatRunsInParallelOrWhatYouCallASubTask);

and somehow you need to know that you actually want to cancel the task, you need to invoke

aFuture.cancel();

looks what you need is more involved, I would suggest look at JMS which is very mature or AKKA that might be easier to grasp.

Upvotes: 2

Related Questions