OG Dude
OG Dude

Reputation: 936

Limiting concurrency and rate for Python threads

Given a number threads I want to limit the rate of calls to the worker function to a rate of say one per second.

My idea was to keep track of the last time a call was made across all threads and compare this to the current time in each thread. Then if current_time - last_time < rate. I let the thread sleep for a bit. Something is wrong with my implementation - I presume I may have gotten the wrong idea about how locks work.

My code:

from Queue import Queue
from threading import Thread, Lock, RLock
import time

num_worker_threads = 2
rate = 1
q = Queue()
lock = Lock()
last_time = [time.time()]

def do_work(i, idx):
    # Do work here, print is just a dummy.
    print('Thread: {0}, Item: {1}, Time: {2}'.format(i, idx, time.time()))

def worker(i):
    while True:
        lock.acquire()
        current_time = time.time()
        interval = current_time - last_time[0]
        last_time[0] = current_time
        if interval < rate:
            time.sleep(rate - interval)
        lock.release()
        item = q.get()
        do_work(i, item)
        q.task_done()

for i in range(num_worker_threads):
     t = Thread(target=worker, args=[i])
     t.daemon = True
     t.start()

for item in xrange(10):
    q.put(item)

q.join()

I was expecting to see one call per second to do_work, however, I get mostly 2 calls at the same time (1 for each thread), followed by a one second pause. What is wrong?


Ok, some edit. The advice to simply throttle the rate at which items are put in the queue was good, however I remembered that I had to take care of the case in which items are re-added to the queue by the workers. Canonical example: pagination or backing-off-retry in network tasks. I came up with the following. I guess that for actual network tasks eventlet/gevent libraries may be easier on resources but this is just an example. It basically uses a priority queue to pile up the requests and uses an extra thread to shovel items from the pile to the actual task queue at an even rate. I simulated re-insertion into the pile by the workers, re-inserted items are then treated first.

import sys
import os
import time
import random

from Queue import Queue, PriorityQueue
from threading import Thread

rate = 0.1

def worker(q, q_pile, idx):
    while True:
        item = q.get()
        print("Thread: {0} processed: {1}".format(item[1], idx))
        if random.random() > 0.3:
            print("Thread: {1} reinserting item: {0}".format(item[1], idx))
            q_pile.put((-1 * time.time(), item[1]))
        q.task_done()

def schedule(q_pile, q):
    while True:
        if not q_pile.empty():
            print("Items on pile: {0}".format(q_pile.qsize()))
            q.put(q_pile.get())
            q_pile.task_done()
        time.sleep(rate)

def main():

    q_pile = PriorityQueue()
    q = Queue()

    for i in range(5):
        t = Thread(target=worker, args=[q, q_pile, i])
        t.daemon = True
        t.start()

    t_schedule = Thread(target=schedule, args=[q_pile, q])
    t_schedule.daemon = True
    t_schedule.start()

    [q_pile.put((-1 * time.time(), i)) for i in range(10)]
    q_pile.join()
    q.join()

if __name__ == '__main__':
    main()

Upvotes: 5

Views: 5119

Answers (2)

Jochen Ritzel
Jochen Ritzel

Reputation: 107588

I get mostly 2 calls at the same time (1 for each thread), followed by a one second pause. What is wrong?

That's exactly what you should expect from your implementation. Lets say the time t starts at 0 and the rate is 1:

Thread1 does this:

    lock.acquire() # both threads wait here, one gets the lock
    current_time = time.time() # we start at t=0
    interval = current_time - last_time[0] # so interval = 0
    last_time[0] = current_time # last_time = t = 0
    if interval < rate: # rate = 1 so we sleep
        time.sleep(rate - interval) # to t=1
    lock.release() # now the other thread wakes up
    # it's t=1 and we do the job

Thread2 does this:

    lock.acquire() # we get the lock at t=1 
    current_time = time.time() # still t=1
    interval = current_time - last_time[0] # interval = 1
    last_time[0] = current_time
    if interval < rate: # interval = rate = 1 so we don't sleep
        time.sleep(rate - interval)
    lock.release() 
    # both threads start the work around t=1

My advice is to limit the speed at which the items are put into the queue.

Upvotes: 1

Russell Borogove
Russell Borogove

Reputation: 19037

It seems weird to me to try and limit the rate across multiple threads. If you limit each thread independently you can avoid all the locking nonsense.

Just a guess, but I think you want to set last_time[0] to time.time() (not current_time) after the sleep.

Upvotes: 0

Related Questions