Mookayama
Mookayama

Reputation: 1343

calling a function with delay

I got this practice interview question.

Implement a job scheduler which takes in a function f and an integer n, and calls f after n milliseconds.

I have a very simple solution:

import time

def schedulerX(f,n):
    time.sleep(0.001*n)
    f

However, the suggested solution is much more elaborated as below. I dont understand whats the purpose behind all this extra code. Please enlightened me.

from time import sleep
import threading

class Scheduler:
    def __init__(self):
        self.fns = [] # tuple of (fn, time)
        t = threading.Thread(target=self.poll)
        t.start()

    def poll(self):
        while True:
            now = time() * 1000
            for fn, due in self.fns:
                if now > due:
                    fn()
            self.fns = [(fn, due) for (fn, due) in self.fns if due > now]
            sleep(0.01)

    def delay(self, f, n):
        self.fns.append((f, time() * 1000 + n))

Upvotes: 5

Views: 3378

Answers (2)

almiki
almiki

Reputation: 455

As others have pointed out, your solution is 'blocking': it prevents anything else from happening while it's waiting to run. The intent of the suggested solution is to let you schedule the job and then carry on with other stuff in the meantime.

As for an explanation of what the suggested code is doing:

You'd first create a Scheduler, which would start its own thread that effectively runs in the background, and which will run the jobs.

scheduler = Scheduler()

In your code, you could then schedule whatever jobs you want without having to wait for them to run:

def my_recurring_job():
    # Do some stuff in the background, then re-run this job again
    # in one second.

    ### Do some stuff ###

    scheduler.delay(my_recurring_job, 1000)

scheduler.delay(lambda: print("5 seconds passed!"), 5 * 1000)
scheduler.delay(lambda: print("2 hours passed!"), 2 * 60 * 60 * 1000)
scheduler.delay(my_recurring_job, 1000)

# You can keep doing other stuff without waiting

The scheduler's thread is just looping forever in its poll method, running any jobs whose time has come, and then sleeping 0.01 seconds and checking again. There's a small bug in the code where if now == due, the job won't run, but it also won't be kept for later. It should be if now >= due: instead.

A more advanced scheduler might use a threading.Condition instead of polling 100 times a second:

import threading
from time import time

class Scheduler:
    def __init__(self):
        self.fns = [] # tuple of (fn, time)

        # The lock prevents 2 threads from messing with fns at the same time;
        # also lets us use Condition
        self.lock = threading.RLock()

        # The condition lets one thread wait, optionally with a timeout,
        # and lets other threads wake it up
        self.condition = threading.Condition(self.lock)

        t = threading.Thread(target=self.poll)
        t.start()

    def poll(self):
        while True:
            now = time() * 1000

            with self.lock:
                # Prevent the other thread from adding to fns while we're sorting
                # out the jobs to run now, and the jobs to keep for later

                to_run = [fn for fn, due in self.fns if due <= now]
                self.fns = [(fn, due) for (fn, due) in self.fns if due > now]

            # Run all the ready jobs outside the lock, so we don't keep it
            # locked longer than we have to
            for fn in to_run:
                fn()

            with self.lock:
                if not self.fns:
                    # If there are no more jobs, wait forever until a new job is 
                    # added in delay(), and notify_all() wakes us up again
                    self.condition.wait()
                else:
                    # Wait only until the soonest next job's due time.
                    ms_remaining = min(due for fn, due in self.fns) - time()*1000
                    if ms_remaining > 0:
                        self.condition.wait(ms_remaining / 1000)

    def delay(self, f, n):
        with self.lock:
            self.fns.append((f, time() * 1000 + n))

            # If the scheduler thread is currently waiting on the condition,
            # notify_all() will wake it up, so that it can consider the new job's
            # due time.
            self.condition.notify_all()

Upvotes: 6

gmds
gmds

Reputation: 19885

There are a few differences (in theory).

The first, and most important, I think, is that your solution can, effectively, only schedule one function at a time. So, for example, say you wanted to run a function f1 10 milliseconds from now, and another function f2 10 milliseconds after that.

You would not be able to do that easily, since something like schedulerX(f1, 10); schedulerX(f2, 10) would wait for f1 to finish running before starting the wait for f2. If f1 takes an hour, your scheduling of f2 will be totally wrong.

The intent of the second version is to, apparently, have the timer and each function running in a separate thread, so that one function call does not block another.

However, as others have noted in the comments, the imports are wrong, it takes a list of functions even though the problem specification says a function, and it doesn't actually work the way I described it, so more or less, there's no difference.

Upvotes: 0

Related Questions