Reputation: 5698
I need the following scenario:
Run all ScheduledFutures
within a cycle and call every time the method tasksCompleted()
after all tasks finished its execution. The next scheduling cycle must not wait while calling tasksCompleted()
after the actual scheduling cycle.
In short: Call a method after the completion of the actual scheduling-cycle and do not stop the next scheduling-cycle
The following code creates tasks and the scheduling works. However, I am not able call tasksCompleted()
when all tasks within a cycle completed.
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
public class Scheduler {
public static void main(String[] args) {
final ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
System.out.println("- [" + LocalTime.now() + "] run parent-task...");
// create 3 tasks: each task needs 7 seconds.
var tasks = createTasks("child", 3, 7);
List<ScheduledFuture<?>> futures = new ArrayList<>();
tasks.forEach(t ->
{
ScheduledFuture<?> future = ses.scheduleWithFixedDelay(t, 0, 2, TimeUnit.SECONDS);
futures.add(future);
});
// this does not work..
var scheduleCycleCompleted = futures.stream().allMatch(f -> f.isDone());
System.out.println("scheduleCycleCompleted: " + scheduleCycleCompleted);
// maybe a solution with CompletableFuture?
CompletableFuture[] cfs = futures.toArray(new CompletableFuture[futures.size()]);
}
static void tasksCompleted() {
System.out.println("schedule cycle completed");
}
static List<Runnable> createTasks(String group, int numbersOfTasks, long taskDuration) {
var tasks = new ArrayList<Runnable>();
for (var i = 0; i < numbersOfTasks; i++) {
int taskNr = i;
Runnable task = () ->
{
System.out.println("- [" + LocalTime.now() + "] Running " + group + "-task" + taskNr + "...[needs "
+ taskDuration + " seconds]");
try {
TimeUnit.SECONDS.sleep(taskDuration);
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
};
tasks.add(task);
}
return tasks;
}
}
Upvotes: 2
Views: 1204
Reputation: 15008
To handle the case of different runtimes within a scheduling cycle, what you need is a way to identify which task belongs to which group. You can do that by giving them an identifying counter, so that each time a task is executed, the counter is used to denominate the group it's running in.
interface GroupedRunnable extends Runnable {
String getGroup();
}
class CountingRunnable implements GroupedRunnable {
private AtomicInteger counter = new AtomicInteger();
private final Runnable delegate;
private final String taskName;
CountingRunnable(Runnable delegate, String taskName) {
this.delegate = delegate;
this.taskName = taskName;
}
public void run() {
System.out.printf("[%s] - Running task %s in group %s%n", LocalTime.now(), taskName, getGroup());
delegate.run();
counter.incrementAndGet();
System.out.printf("[%s] - Running task %s in group %s finished%n", LocalTime.now(), taskName, getGroup());
}
@Override
public String getGroup() {
return counter.toString();
}
}
Now, you can have a watchdog class that keeps track on which members of which group have already executed. Since you know how many members a group has, a simple counter is sufficient.
class GroupMonitoringService {
// key: group, value: tasks
Map<String, AtomicInteger> finishedTasks = new HashMap<>();
private final Runnable finisher;
private final int groupSize;
GroupMonitoringService(Runnable finisher, int groupSize) {
this.finisher = finisher;
this.groupSize = groupSize;
}
public synchronized void taskFinished(String group) {
var finishedInGroup = finishedTasks.computeIfAbsent(group, k -> new AtomicInteger());
if (finishedInGroup.incrementAndGet() >= groupSize) {
// scheduling group complete
System.out.printf("Group %s finished executing%n", group);
finisher.run();
finishedTasks.remove(group);
}
}
}
Now all you have to do is wrap your original task into a grouped task, and make sure the monitoring service is notified when it's finished, so wrap again.
private static List<Runnable> createTasks() {
List<Runnable> result = new ArrayList<>();
for (int i = 0; i < GROUP_SIZE; i++) {
RandomWaitTask originalTask = new RandomWaitTask();
CountingRunnable groupedTask = new CountingRunnable(originalTask, "Task " + i);
Runnable notifyingRunnable = () -> {
groupedTask.run();
MONITORING_SERVICE.taskFinished(groupedTask.getGroup());
};
result.add(notifyingRunnable);
}
return result;
}
So now you can just schedule those.
You can see the entire code here (although it doesn't actually run properly on that site because of the resource limit it imposes, but if you copy it into your IDE, it works).
Upvotes: 1
Reputation: 66
Updated
I hope it will work.
CountDownLatch will solve the problem here.
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class Scheduler {
public static void main(String[] args) throws InterruptedException {
final ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
System.out.println("- [" + LocalTime.now() + "] run parent-task...");
int noOfTask=3;
CountDownLatch countDownLatch = new CountDownLatch(noOfTask);
TaskComplete taskComplete=new TaskCompleteImpl(noOfTask,countDownLatch);
// create 3 tasks: each task needs 7 seconds.
List<Runnable> tasks = createTasks("child", noOfTask, 7,countDownLatch,taskComplete);
List<ScheduledFuture<?>> futures = new ArrayList<>();
tasks.forEach(t ->
{
ScheduledFuture<?> future = ses.scheduleWithFixedDelay(t, 0, 2, TimeUnit.SECONDS);
futures.add(future);
});
// this does not work..
}
interface TaskComplete{
void tasksCompleted();
}
static class TaskCompleteImpl implements TaskComplete {
int totalTask=0;
int index=0;
CountDownLatch countDownLatch;
public TaskCompleteImpl(int totalTask){
}
public TaskCompleteImpl(int noOfTask, CountDownLatch countDownLatch) {
this.totalTask=noOfTask;
this.countDownLatch=countDownLatch;
}
@Override
public synchronized void tasksCompleted() {
index=index+1;
if(index==totalTask){
System.out.println("schedule cycle completed");
index=0;
countDownLatch=new CountDownLatch(totalTask);
}
}
}
static List<Runnable> createTasks(String group, int numbersOfTasks, long taskDuration, CountDownLatch countDownLatch, TaskComplete taskComplete) {
List tasks = new ArrayList<Runnable>();
for (int i = 0; i < numbersOfTasks; i++) {
int taskNr = i;
Runnable task = () ->
{
System.out.println("- [" + LocalTime.now() + "] Running " + group + "-task" + taskNr + "...[needs "
+ taskDuration + " seconds]");
try {
TimeUnit.SECONDS.sleep(taskDuration);
countDownLatch.countDown();
countDownLatch.await();
taskComplete.tasksCompleted();
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
};
tasks.add(task);
}
return tasks;
}
}
Upvotes: 2