Piotrold
Piotrold

Reputation: 246

How to use virtual threads with ScheduledExecutorService

I want to use virtual threads introduced in Java 19 and ScheduledExecutorService. I need to schedule some threads to be run every minute. I know that I can use something like this: ScheduledExecutorService executor = Executors.newScheduledThreadPool(100, Thread.ofVirtual().factory()); But it looks like I'm forced to set pool size.

I'm looking for a fabric method similar to this: ScheduledExecutorService executor = Executors.newScheduledThreadPool(Thread.ofVirtual().factory()); But I can't find it. I would like to follow "one virtual thread per task" principle and not be forced to set a fixed number of threads in the pool. Do you know if I can use ScheduledExecutorService in that way? Or some alternative exists which are adapted to virtual threads?

UPDATE

Let me elaborate on what problem I try to solve. So I need to create more than 1000 tasks (I don't know the exact number, I can only estimate it). Which should be run periodically. Some need to be run every minute, some every two minutes, etc.

Those tasks will perform I/O operations (network requests). So virtual threads look like a good choice. But I need some scheduling functionality to achieve it. By choosing ScheduledExecutorService I can use methods like: scheduledThreadPoolExecutor.scheduleAtFixedRate(runnableTask, 60, 60, TimeUnit.SECONDS )

If I would not need scheduling I would simply create an executor like that: var executor = Executors.newVirtualThreadPerTaskExecutor() But plain ExecutorService doesn't provide any scheduling functionality. And I would be forced to implement scheduling on my own.

So for now the best solution I found is: Executors.newScheduledThreadPool(1000, Thread.ofVirtual().factory()); This generally looks good but I wonder if some other solution in Java API exists which allows me to create ScheduledExecutor but I will not be forced to set the size of a thread pool. Which for me looks a little bit strange when we consider virtual threads.

Upvotes: 18

Views: 15064

Answers (5)

Zoltan Balogh
Zoltan Balogh

Reputation: 11

You can read this article:

use-executorservice-with-virtual-threads

This explains how to use virtual threads with executor service. Important to know, that using a pool of threads is not necessary in the case of virtual thread. As a result, it makes sense to use only ExecutorService implementations without pooling.

If you need ScheduledExecutorService along with using virtual threads, then use the following:

ThreadFactory factory = Thread.ofVirtual().factory();
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(0, factory);

Callable<String> scheduledCallable = () -> {
    System.out.println("Done");
    return "Done";
};
scheduledExecutorService.schedule(scheduledCallable, 1, TimeUnit.SECONDS);

Actually, I copied this example from the article.

Upvotes: 1

Watermelon
Watermelon

Reputation: 632

You can implement the VirtualScheduledExecutorService on your own.

/**
 *  Scheduler which uses JVM Scheduler to schedule jobs
 *
 * */
@BlockingExecutor
class LoomScheduledExecutorService internal constructor(
    private val actualImp: ExecutorService,
    private val timeSource: TimeSource.WithComparableMarks,
): AbstractExecutorService(), ScheduledExecutorService, Closeable {
//    val source: InstantSource = Clock.systemUTC()


    constructor(
        builder: Thread.Builder.OfVirtual = Thread.ofVirtual().name("virtual-dispatcher",0),
        timeSource: TimeSource.WithComparableMarks = TimeSource.Monotonic
    ):this(Executors.newThreadPerTaskExecutor(builder.factory()), timeSource)

    private val sequencer = AtomicLong()

    override fun execute(command: java.lang.Runnable) =
        actualImp.execute(command)

    override fun shutdown() =
        actualImp.shutdown()

    override fun awaitTermination(timeout: Long, unit: TimeUnit) =
        actualImp.awaitTermination(timeout, unit)

    override fun isShutdown() = actualImp.isShutdown

    override fun isTerminated() = actualImp.isTerminated

    override fun shutdownNow() = actualImp.shutdownNow()

    override fun schedule(
        command: Runnable,
        delay: Long,
        unit: TimeUnit
    ): ScheduledFuture<*> {
//        val ins = source.instant()
        val duration = delay.toDuration(unit.toDurationUnit())
        val mark = timeSource.markNow() + duration
        val task = DelayedTask(
            command,
            Unit,
            mark,
            sequencer.getAndIncrement()
        )

        execute(task)
        return task
    }

    override fun <V : Any?> schedule(callable: Callable<V>, delay: Long, unit: TimeUnit): ScheduledFuture<V> {
        val duration = delay.toDuration(unit.toDurationUnit())
        val task = DelayedTask(
            callable,
            timeSource.markNow() + duration,
            sequencer.getAndIncrement()
        )
        execute(task)
        return task
    }

    override fun scheduleAtFixedRate(
        command: Runnable,
        initialDelay: Long,
        period: Long,
        unit: TimeUnit
    ): ScheduledFuture<*> {
        /**
         *
         * initialDelay,
         * then initialDelay + period,
         * then initialDelay + 2 * period
         * */
        val taskPeriod = period.toDuration(unit.toDurationUnit())
        if (taskPeriod <= Duration.ZERO) {
            throw IllegalArgumentException("period <= 0")
        }

        val mark = timeSource.markNow() +
                initialDelay
                    .toDuration(unit.toDurationUnit())
        val task = FixedRateTask(command, Unit, mark, taskPeriod, sequencer.getAndIncrement())
        execute(task)
        return task
    }

    override fun scheduleWithFixedDelay(
        command: Runnable,
        initialDelay: Long,
        delay: Long,
        unit: TimeUnit
    ): ScheduledFuture<*> {
        /**
         *  initial delay,
         *  and subsequently
         *  with the given delay between
         *  the termination of one execution
         *  and the commencement of the next.
         * */

        val taskPeriod = delay.toDuration(unit.toDurationUnit())
        if (taskPeriod <= Duration.ZERO) {
            throw IllegalArgumentException("period <= 0")
        }
        val mark = timeSource.markNow() +
                initialDelay
                    .toDuration(unit.toDurationUnit())
        val task = PeriodicTask(command, Unit, mark, taskPeriod, sequencer.getAndIncrement())
        execute(task)
        return task
    }

    override fun close() = super<AbstractExecutorService>.close()

    private data class Sleeping(
        private val expectedMark: ComparableTimeMark
    ): Runnable {
        override fun run() {
            Thread.sleep(expectedMark.elapsedNow().unaryMinus().toJavaDuration())
        }
    }


    private class RepeatableFutureTask<V>:FutureTask<V> {
        constructor(r: Runnable, result:V):super(r,result)
        constructor(c:Callable<V>):super(c)

        public override fun runAndReset(): Boolean {
            return super.runAndReset()
        }
    }

    sealed class BaseTask<V>: FutureTask<V>, RunnableScheduledFuture<V> {
        val mark: ComparableTimeMark
        val sequenceNumber:Long
        constructor(
            r:Runnable,
            result: V,
            sequenceNumber:Long,
            mark: ComparableTimeMark
        ): super(r, result) {
            this.sequenceNumber = sequenceNumber
            this.mark = mark

        }

        constructor(
            c: Callable<V>,
            sequenceNumber:Long,
            mark: ComparableTimeMark
        ): super(c) {
            this.sequenceNumber = sequenceNumber
            this.mark = mark
        }

        override fun isPeriodic() = false

        override fun compareTo(other: Delayed): Int {
            if (other == this) {
                return 0;
            }
            return if (other is BaseTask<*>) {
                if (other.expectedMark > expectedMark || other.expectedMark < expectedMark) {
                    expectedMark.compareTo(other.expectedMark)
                } else {
                    sequenceNumber.compareTo(other.sequenceNumber)
                }
            } else {
                val diff = getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS)
                if (diff < 0) -1 else if (diff > 0) 1 else 0
            }
        }

        override fun getDelay(unit: TimeUnit): Long {
            return expectedMark.elapsedNow().unaryMinus()
                .toLong(unit.toDurationUnit())
        }

        open val expectedMark:ComparableTimeMark
            get() = mark
    }

    private class DelayedTask<V> : BaseTask<V> {

        constructor(
            r:Runnable,
            result: V,
            triggerTime: ComparableTimeMark,
            sequenceNumber:Long
        ): super(r, result, sequenceNumber, triggerTime)

        constructor(
            c: Callable<V>,
            triggerTime: ComparableTimeMark,
            sequenceNumber:Long
        ): super(c, sequenceNumber, triggerTime)


        override fun isPeriodic() = false

        private val childTask = FutureTask(Sleeping(mark), Unit)

        override fun run() {
            childTask.run()
            if (childTask.isCancelled) {
                super.cancel(false)
            }
            super.run()
        }

        override fun cancel(mayInterruptIfRunning: Boolean): Boolean {
            childTask.cancel(true)
            return super.cancel(mayInterruptIfRunning)
        }

    }

    private class FixedRateTask<V>: BaseTask<V> {
        private val period: Duration

        @Volatile
        private var virtualMark:ComparableTimeMark
        private val initialDelay = FutureTask(Sleeping(mark), Unit)
        private val delayTask:RepeatableFutureTask<Unit>

        constructor(
            r:Runnable,
            result: V,
            triggerTime: ComparableTimeMark,
            period: Duration,
            sequenceNumber:Long
        ): super(r, result, sequenceNumber, triggerTime) {
            this.period = period
            this.virtualMark = mark
            val job = Runnable { Thread.sleep(virtualMark.elapsedNow().unaryMinus().toJavaDuration()) }
            this.delayTask = RepeatableFutureTask(job,Unit)
        }

        constructor(
            c: Callable<V>,
            triggerTime: ComparableTimeMark,
            period: Duration,
            sequenceNumber:Long
        ): super(c, sequenceNumber, triggerTime) {
            this.period = period
            this.virtualMark = mark
            val job = Runnable { Thread.sleep(virtualMark.elapsedNow().unaryMinus().toJavaDuration()) }
            this.delayTask = RepeatableFutureTask(job,Unit)
        }

        override fun run() {
            initialDelay.run()
            if (initialDelay.isCancelled) {
                super.cancel(false)
                delayTask.cancel(false)
            }
            var before = mark + mark.elapsedNow()
            while (runAndReset()) {

                val after = mark + mark.elapsedNow()
                val workDuration = after - before
//                before = after
                if (workDuration < period) {
                    virtualMark = after + (period - workDuration)
                    if (!delayTask.runAndReset()) {
                        cancel(false)
                        break
                    }
                } else {
                    // run job ASAP
                    virtualMark = after
                }
                before = mark + mark.elapsedNow()

            }
        }

        override fun isPeriodic() = true

        override val expectedMark get() = virtualMark

        override fun cancel(mayInterruptIfRunning: Boolean): Boolean {
            initialDelay.cancel(true)
            delayTask.cancel(true)
            return super.cancel(mayInterruptIfRunning)
        }
    }

    private class PeriodicTask<V>: BaseTask<V> {
        private val period: Duration

        private var virtualMark:ComparableTimeMark
        constructor(
            r:Runnable,
            result: V,
            triggerTime: ComparableTimeMark,
            period: Duration,
            sequenceNumber:Long
        ): super(r, result, sequenceNumber, triggerTime) {
            this.period = period
            this.virtualMark = mark
            this.delayTask = RepeatableFutureTask(Sleep(period),Unit)
        }

        constructor(
            c: Callable<V>,
            triggerTime: ComparableTimeMark,
            period: Duration,
            sequenceNumber:Long
        ): super(c, sequenceNumber, triggerTime) {
            this.period = period
            this.virtualMark = mark
            this.delayTask = RepeatableFutureTask(Sleep(period),Unit)
        }

        private val initialDelay = FutureTask(Sleeping(mark), Unit)
        private val delayTask:RepeatableFutureTask<Unit>

        private data class Sleep(private val duration: Duration):Runnable {

            override fun run() {
                Thread.sleep(duration.toJavaDuration())
            }
        }

        override fun run() {
            initialDelay.run()
            if (initialDelay.isCancelled) {
                super.cancel(false)
                delayTask.cancel(false)
            }
            while (runAndReset()) {
                virtualMark = mark + mark.elapsedNow() + period
                if (!delayTask.runAndReset()) {
                    cancel(false)
                    break
                }
            }
        }

        override fun isPeriodic() = true

        override val expectedMark get() = virtualMark

        override fun cancel(mayInterruptIfRunning: Boolean): Boolean {
            initialDelay.cancel(true)
            delayTask.cancel(true)
            return super.cancel(mayInterruptIfRunning)
        }
    }

}

Upvotes: -4

JPG
JPG

Reputation: 998

Executors.newScheduledThreadPool(0, Thread.ofVirtual().factory());

Upvotes: 8

Gili
Gili

Reputation: 90023

Honestly, I think everyone is overthinking this. The main benefit of Virtual Threads is the ability to spin up one virtual thread per task, and implement the task using synchronous APIs. Here is how I would implement this:

Thread.ofVirtual().start(() ->
{
  Thread.sleep(...); // Sleep until you want the task to run
  task.run();
  // Tada!
});

ScheduledExecutorService is a thread pool. You are not supposed to use thread pools with virtual threads.

Upvotes: 6

matt
matt

Reputation: 12347

I think you want to consider offloading the work to virtual threads, and schedule the work with a sheduler.

ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor()
ExecutorService donkey = Executors.newVirtualThreadPerTaskExecutor()

Then when you want to schedule a task.

void schedule(Runnable command, long delay, TimeUnit unit){
    scheduler.schedule( ()->donkey.execute(command), delay, unit);
}

You really don't want your scheduling thread to be split up amongst virtual threads because you want your scheduling thread to be free to schedule more tasks.

Upvotes: 25

Related Questions