Reputation: 246
I want to use virtual threads introduced in Java 19 and ScheduledExecutorService.
I need to schedule some threads to be run every minute. I know that I can use something like this: ScheduledExecutorService executor = Executors.newScheduledThreadPool(100, Thread.ofVirtual().factory());
But it looks like I'm forced to set pool size.
I'm looking for a fabric method similar to this: ScheduledExecutorService executor = Executors.newScheduledThreadPool(Thread.ofVirtual().factory());
But I can't find it. I would like to follow "one virtual thread per task" principle and not be forced to set a fixed number of threads in the pool.
Do you know if I can use ScheduledExecutorService in that way? Or some alternative exists which are adapted to virtual threads?
UPDATE
Let me elaborate on what problem I try to solve. So I need to create more than 1000 tasks (I don't know the exact number, I can only estimate it). Which should be run periodically. Some need to be run every minute, some every two minutes, etc.
Those tasks will perform I/O operations (network requests). So virtual threads look like a good choice.
But I need some scheduling functionality to achieve it.
By choosing ScheduledExecutorService I can use methods like:
scheduledThreadPoolExecutor.scheduleAtFixedRate(runnableTask, 60, 60, TimeUnit.SECONDS )
If I would not need scheduling I would simply create an executor like that: var executor = Executors.newVirtualThreadPerTaskExecutor()
But plain ExecutorService doesn't provide any scheduling functionality.
And I would be forced to implement scheduling on my own.
So for now the best solution I found is: Executors.newScheduledThreadPool(1000, Thread.ofVirtual().factory());
This generally looks good but I wonder if some other solution in Java API exists which allows me to create ScheduledExecutor but I will not be forced to set the size of a thread pool. Which for me looks a little bit strange when we consider virtual threads.
Upvotes: 18
Views: 15064
Reputation: 11
You can read this article:
use-executorservice-with-virtual-threads
This explains how to use virtual threads with executor service. Important to know, that using a pool of threads is not necessary in the case of virtual thread. As a result, it makes sense to use only ExecutorService implementations without pooling.
If you need ScheduledExecutorService along with using virtual threads, then use the following:
ThreadFactory factory = Thread.ofVirtual().factory();
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(0, factory);
Callable<String> scheduledCallable = () -> {
System.out.println("Done");
return "Done";
};
scheduledExecutorService.schedule(scheduledCallable, 1, TimeUnit.SECONDS);
Actually, I copied this example from the article.
Upvotes: 1
Reputation: 632
You can implement the VirtualScheduledExecutorService on your own.
/**
* Scheduler which uses JVM Scheduler to schedule jobs
*
* */
@BlockingExecutor
class LoomScheduledExecutorService internal constructor(
private val actualImp: ExecutorService,
private val timeSource: TimeSource.WithComparableMarks,
): AbstractExecutorService(), ScheduledExecutorService, Closeable {
// val source: InstantSource = Clock.systemUTC()
constructor(
builder: Thread.Builder.OfVirtual = Thread.ofVirtual().name("virtual-dispatcher",0),
timeSource: TimeSource.WithComparableMarks = TimeSource.Monotonic
):this(Executors.newThreadPerTaskExecutor(builder.factory()), timeSource)
private val sequencer = AtomicLong()
override fun execute(command: java.lang.Runnable) =
actualImp.execute(command)
override fun shutdown() =
actualImp.shutdown()
override fun awaitTermination(timeout: Long, unit: TimeUnit) =
actualImp.awaitTermination(timeout, unit)
override fun isShutdown() = actualImp.isShutdown
override fun isTerminated() = actualImp.isTerminated
override fun shutdownNow() = actualImp.shutdownNow()
override fun schedule(
command: Runnable,
delay: Long,
unit: TimeUnit
): ScheduledFuture<*> {
// val ins = source.instant()
val duration = delay.toDuration(unit.toDurationUnit())
val mark = timeSource.markNow() + duration
val task = DelayedTask(
command,
Unit,
mark,
sequencer.getAndIncrement()
)
execute(task)
return task
}
override fun <V : Any?> schedule(callable: Callable<V>, delay: Long, unit: TimeUnit): ScheduledFuture<V> {
val duration = delay.toDuration(unit.toDurationUnit())
val task = DelayedTask(
callable,
timeSource.markNow() + duration,
sequencer.getAndIncrement()
)
execute(task)
return task
}
override fun scheduleAtFixedRate(
command: Runnable,
initialDelay: Long,
period: Long,
unit: TimeUnit
): ScheduledFuture<*> {
/**
*
* initialDelay,
* then initialDelay + period,
* then initialDelay + 2 * period
* */
val taskPeriod = period.toDuration(unit.toDurationUnit())
if (taskPeriod <= Duration.ZERO) {
throw IllegalArgumentException("period <= 0")
}
val mark = timeSource.markNow() +
initialDelay
.toDuration(unit.toDurationUnit())
val task = FixedRateTask(command, Unit, mark, taskPeriod, sequencer.getAndIncrement())
execute(task)
return task
}
override fun scheduleWithFixedDelay(
command: Runnable,
initialDelay: Long,
delay: Long,
unit: TimeUnit
): ScheduledFuture<*> {
/**
* initial delay,
* and subsequently
* with the given delay between
* the termination of one execution
* and the commencement of the next.
* */
val taskPeriod = delay.toDuration(unit.toDurationUnit())
if (taskPeriod <= Duration.ZERO) {
throw IllegalArgumentException("period <= 0")
}
val mark = timeSource.markNow() +
initialDelay
.toDuration(unit.toDurationUnit())
val task = PeriodicTask(command, Unit, mark, taskPeriod, sequencer.getAndIncrement())
execute(task)
return task
}
override fun close() = super<AbstractExecutorService>.close()
private data class Sleeping(
private val expectedMark: ComparableTimeMark
): Runnable {
override fun run() {
Thread.sleep(expectedMark.elapsedNow().unaryMinus().toJavaDuration())
}
}
private class RepeatableFutureTask<V>:FutureTask<V> {
constructor(r: Runnable, result:V):super(r,result)
constructor(c:Callable<V>):super(c)
public override fun runAndReset(): Boolean {
return super.runAndReset()
}
}
sealed class BaseTask<V>: FutureTask<V>, RunnableScheduledFuture<V> {
val mark: ComparableTimeMark
val sequenceNumber:Long
constructor(
r:Runnable,
result: V,
sequenceNumber:Long,
mark: ComparableTimeMark
): super(r, result) {
this.sequenceNumber = sequenceNumber
this.mark = mark
}
constructor(
c: Callable<V>,
sequenceNumber:Long,
mark: ComparableTimeMark
): super(c) {
this.sequenceNumber = sequenceNumber
this.mark = mark
}
override fun isPeriodic() = false
override fun compareTo(other: Delayed): Int {
if (other == this) {
return 0;
}
return if (other is BaseTask<*>) {
if (other.expectedMark > expectedMark || other.expectedMark < expectedMark) {
expectedMark.compareTo(other.expectedMark)
} else {
sequenceNumber.compareTo(other.sequenceNumber)
}
} else {
val diff = getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS)
if (diff < 0) -1 else if (diff > 0) 1 else 0
}
}
override fun getDelay(unit: TimeUnit): Long {
return expectedMark.elapsedNow().unaryMinus()
.toLong(unit.toDurationUnit())
}
open val expectedMark:ComparableTimeMark
get() = mark
}
private class DelayedTask<V> : BaseTask<V> {
constructor(
r:Runnable,
result: V,
triggerTime: ComparableTimeMark,
sequenceNumber:Long
): super(r, result, sequenceNumber, triggerTime)
constructor(
c: Callable<V>,
triggerTime: ComparableTimeMark,
sequenceNumber:Long
): super(c, sequenceNumber, triggerTime)
override fun isPeriodic() = false
private val childTask = FutureTask(Sleeping(mark), Unit)
override fun run() {
childTask.run()
if (childTask.isCancelled) {
super.cancel(false)
}
super.run()
}
override fun cancel(mayInterruptIfRunning: Boolean): Boolean {
childTask.cancel(true)
return super.cancel(mayInterruptIfRunning)
}
}
private class FixedRateTask<V>: BaseTask<V> {
private val period: Duration
@Volatile
private var virtualMark:ComparableTimeMark
private val initialDelay = FutureTask(Sleeping(mark), Unit)
private val delayTask:RepeatableFutureTask<Unit>
constructor(
r:Runnable,
result: V,
triggerTime: ComparableTimeMark,
period: Duration,
sequenceNumber:Long
): super(r, result, sequenceNumber, triggerTime) {
this.period = period
this.virtualMark = mark
val job = Runnable { Thread.sleep(virtualMark.elapsedNow().unaryMinus().toJavaDuration()) }
this.delayTask = RepeatableFutureTask(job,Unit)
}
constructor(
c: Callable<V>,
triggerTime: ComparableTimeMark,
period: Duration,
sequenceNumber:Long
): super(c, sequenceNumber, triggerTime) {
this.period = period
this.virtualMark = mark
val job = Runnable { Thread.sleep(virtualMark.elapsedNow().unaryMinus().toJavaDuration()) }
this.delayTask = RepeatableFutureTask(job,Unit)
}
override fun run() {
initialDelay.run()
if (initialDelay.isCancelled) {
super.cancel(false)
delayTask.cancel(false)
}
var before = mark + mark.elapsedNow()
while (runAndReset()) {
val after = mark + mark.elapsedNow()
val workDuration = after - before
// before = after
if (workDuration < period) {
virtualMark = after + (period - workDuration)
if (!delayTask.runAndReset()) {
cancel(false)
break
}
} else {
// run job ASAP
virtualMark = after
}
before = mark + mark.elapsedNow()
}
}
override fun isPeriodic() = true
override val expectedMark get() = virtualMark
override fun cancel(mayInterruptIfRunning: Boolean): Boolean {
initialDelay.cancel(true)
delayTask.cancel(true)
return super.cancel(mayInterruptIfRunning)
}
}
private class PeriodicTask<V>: BaseTask<V> {
private val period: Duration
private var virtualMark:ComparableTimeMark
constructor(
r:Runnable,
result: V,
triggerTime: ComparableTimeMark,
period: Duration,
sequenceNumber:Long
): super(r, result, sequenceNumber, triggerTime) {
this.period = period
this.virtualMark = mark
this.delayTask = RepeatableFutureTask(Sleep(period),Unit)
}
constructor(
c: Callable<V>,
triggerTime: ComparableTimeMark,
period: Duration,
sequenceNumber:Long
): super(c, sequenceNumber, triggerTime) {
this.period = period
this.virtualMark = mark
this.delayTask = RepeatableFutureTask(Sleep(period),Unit)
}
private val initialDelay = FutureTask(Sleeping(mark), Unit)
private val delayTask:RepeatableFutureTask<Unit>
private data class Sleep(private val duration: Duration):Runnable {
override fun run() {
Thread.sleep(duration.toJavaDuration())
}
}
override fun run() {
initialDelay.run()
if (initialDelay.isCancelled) {
super.cancel(false)
delayTask.cancel(false)
}
while (runAndReset()) {
virtualMark = mark + mark.elapsedNow() + period
if (!delayTask.runAndReset()) {
cancel(false)
break
}
}
}
override fun isPeriodic() = true
override val expectedMark get() = virtualMark
override fun cancel(mayInterruptIfRunning: Boolean): Boolean {
initialDelay.cancel(true)
delayTask.cancel(true)
return super.cancel(mayInterruptIfRunning)
}
}
}
Upvotes: -4
Reputation: 90023
Honestly, I think everyone is overthinking this. The main benefit of Virtual Threads is the ability to spin up one virtual thread per task, and implement the task using synchronous APIs. Here is how I would implement this:
Thread.ofVirtual().start(() ->
{
Thread.sleep(...); // Sleep until you want the task to run
task.run();
// Tada!
});
ScheduledExecutorService is a thread pool. You are not supposed to use thread pools with virtual threads.
Upvotes: 6
Reputation: 12347
I think you want to consider offloading the work to virtual threads, and schedule the work with a sheduler.
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor()
ExecutorService donkey = Executors.newVirtualThreadPerTaskExecutor()
Then when you want to schedule a task.
void schedule(Runnable command, long delay, TimeUnit unit){
scheduler.schedule( ()->donkey.execute(command), delay, unit);
}
You really don't want your scheduling thread to be split up amongst virtual threads because you want your scheduling thread to be free to schedule more tasks.
Upvotes: 25