Reputation: 3
case: considered you have a queue with tasks(task1,task2,task3,task1,task2,task3,...), how can I excecute the tasks in the queue by using exactly 2 threads.
requirement:
1.one thread should execute task1 and task2, anthoner should do task3, or conversely because some limited conditions eg. limited outside resources
2.task3 should always be executed after finishing to execute task1 and task2 in the exactly order of the queue
3.should consider the diff situation ,eg the time consuming of task1,task2 and task3 may be totally different
4.should` not come out dead loop
CODE
public class testRunManager {
public static void main(String[] args){
ConcurrentLinkedQueue<BaseTask> tasks = new ConcurrentLinkedQueue<>();
for (int index = 0; index < 10; index++) {
tasks.add(new Task1());
tasks.add(new Task2());
tasks.add(new Task3());
}
BaseRunManager.getInstance().addTasks(tasks);
Thread thread1 = BaseRunManager.getInstance().getNewThread(TaskThread.Type.BeforeWards);
Thread thread2 = BaseRunManager.getInstance().getNewThread(TaskThread.Type.AfterWards);
//start
thread1.start();
thread2.start();
}
}
public class TaskThread extends Thread{
enum Type{
BeforeWards,
AfterWards
}
Type type;
public TaskThread(Type type,Runnable runnable){
super(runnable);
this.type = type;
}
}
public interface ShouldRunBeforeWardsJob {
}
public interface ShouldRunAfterWardsJob {
}
abstract public class RunController {
public enum Performance {
BUSYING,
RUNNING,
PAUSED,
}
protected enum ControlState {
PAUSING,
PAUSED,
STOPING,
RESUMING,
RUNNING,
STEPPING,
}
private ControlState state = ControlState.RUNNING;
private Performance performance = Performance.BUSYING;
private List<ControlListener> listeners = new ArrayList<>();
protected ReentrantLock controlLock = new ReentrantLock();
protected Condition controlCondition = controlLock.newCondition();
public Performance getPerformance() {
return performance;
}
protected ControlState getState() {
return state;
}
public void addListener(ControlListener listener) {
listeners.add(listener);
}
public void removeListener(ControlListener listener) {
listeners.remove(listener);
}
public void pause() {
if (performance != Performance.RUNNING) {
return;
}
setState(ControlState.PAUSING);
}
public void step() {
if (performance != Performance.PAUSED) {
return;
}
setState(ControlState.STEPPING);
}
public void stop() {
if (performance != Performance.RUNNING && performance != Performance.PAUSED) {
return;
}
setState(ControlState.STOPING);
}
public void resume() {
if (performance != Performance.PAUSED) {
return;
}
setState(ControlState.RESUMING);
}
private void setPerformance(Performance p) {
if (performance != p) {
Performance old = this.performance;
this.performance = p;
for (ControlListener cl : listeners) {
cl.performChanged(old, p);
}
}
}
protected void setState(ControlState state) {
controlLock.lock();
try {
this.state = state;
switch (this.state) {
case RESUMING:
case STEPPING:
case PAUSING:
case STOPING:
controlCondition.signal();
setPerformance(Performance.BUSYING);
break;
case PAUSED:
setPerformance(Performance.PAUSED);
break;
case RUNNING:
setPerformance(Performance.RUNNING);
}
}finally {
controlLock.unlock();
}
}
public interface ControlListener {
void performChanged(Performance oldState, Performance newState);
}
}
public abstract class BaseTask {
enum State{
FINISH,
NOT
}
protected State state;
public State getState(){
return state;
}
public void setState(State state){
this.state = state;
}
abstract void runJob();
abstract void doJob();
}
public class BaseRunManager {
private static BaseRunManager instance;
private ConcurrentLinkedQueue<BaseTask> tasks = new
ConcurrentLinkedQueue<>();
public synchronized static BaseRunManager getInstance(){
if(instance == null){
instance = new BaseRunManager();
}
return instance;
}
public BaseRunManager(){
}
public void addTasks(ConcurrentLinkedQueue<BaseTask> tasks){
this.tasks = tasks;
}
public Thread getNewThread(TaskThread.Type type){
return new TaskThread(type,new BaseRunnable());
}
private class BaseRunnable extends RunController implements Runnable{
private BaseTask curTask;
private final AtomicBoolean afterwardsFinish = new AtomicBoolean(true);
private final AtomicInteger beforewardsFinishNum = new AtomicInteger(0);
private final AtomicInteger currentThreadNum = new AtomicInteger(0);
private final Condition condition = controlLock.newCondition();
@Override
public void run() {
currentThreadNum.incrementAndGet();
TaskThread curThread = (TaskThread)Thread.currentThread();
while (tasks.size()>0) {
//get task
controlLock.lock();
try {
curTask = tasks.peek();
if ((curTask instanceof ShouldRunBeforeWardsJob && curThread.type == TaskThread.Type.BeforeWards)
|| (curTask instanceof ShouldRunAfterWardsJob && curThread.type == TaskThread.Type.AfterWards)) {
tasks.poll();
if (curTask instanceof ShouldRunBeforeWardsJob) {
curTask.runJob();
beforewardsFinishNum.incrementAndGet();
condition.signalAll();
} else if (curTask instanceof ShouldRunAfterWardsJob) {
if (beforewardsFinishNum.get() / 2 != 0) {
condition.await();
curTask.runJob();
}
}
} else {
condition.awaitNanos(20);
continue;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
controlLock.unlock();
}
}
}
}
}
Upvotes: 0
Views: 200
Reputation: 9650
Here is another approach:
public static void main(String[] args) {
final BlockingQueue<BaseTask> tasks = new LinkedBlockingQueue<>();
final BlockingQueue<BaseTask> queue2 = new LinkedBlockingQueue<>();
for (int index = 0; index < 10; index++) {
tasks.add(new BaseTask("Task1"));
tasks.add(new BaseTask("Task2"));
tasks.add(new BaseTask("Task3"));
}
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
BaseTask task = tasks.take();
task.run();
task = tasks.take();
task.run();
task = tasks.take();
queue2.offer(task);
} catch (InterruptedException ex) {
Logger.getLogger(Main.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
});
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
BaseTask task = queue2.take();
task.run();
} catch (InterruptedException ex) {
Logger.getLogger(Main.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
});
thread2.start();
thread1.start();
}
private static class BaseTask implements Runnable {
private final String name;
public BaseTask(String name) {
this.name = name;
}
@Override
public void run() {
System.out.println(name + " ["
+ Thread.currentThread().getName() + "]");
}
}
Upvotes: 0
Reputation: 2626
In our latest project we have tried some thing like this.You can put dependent task in single worker thread so they will execute sequentially.
public interface SerializedRunnable extends Runnable {
int getKey();
}
public void execute(Runnable command) {
final int key;
if (command instanceof SerializedRunnable ) {
key = ((SerializedRunnable ) command).getKey();
}
final int index =Math.abs(key) % size;
workers[index].execute(command);
Upvotes: 0