dman
dman

Reputation: 11073

Python Queue and Threading Module - Impose a extra custom lock?

Using Linux and Python 2.7.6, I have a script that uploads lots of files at one time. I am using multi-threading with the Queue and Threading modules.

I have a object that keeps track of the files that have been successfully uploaded and decrements after each successfull upload. I need to make this operation atomic/thread safe. Since the Queue module is high level and has it's own mutex on the lower level, can I impose my own lock/acquire in addition to it? I tried doing this and had no errors(at the bottom of the last code block where file_quantity.deduct() is). But I am not sure if it is truly working as it should. Here is the shortened version for readability:

class FileQuantity(object):
    """Keeps track of files that have been uploaded and how many are left"""

    def __init__(self, file_quantity):
        self.quantity = file_quantity
        self.total = file_quantity

    def deduct(self):
        self.quantity -= 1

kill_received = False
lock = threading.Lock()

class CustomQueue(Queue.Queue):
    #Can not use .join() because it would block any processing
    #for SIGINT untill threads are done. To counter this,
    # wait() is given a time out along with while not kill_received
    #to be checked

    def join(self):
        self.all_tasks_done.acquire()
        try:
            while not kill_received and self.unfinished_tasks:
            self.all_tasks_done.wait(10.0)
        finally:
            self.all_tasks_done.release()


def do_the_uploads(file_list, file_quantity,
    retry_list, authenticate):
    """The uploading engine"""
    value = raw_input(
        "\nPlease enter how many concurent "
        "uploads you want at one time(example: 200)> ")
    value = int(value)
    logger.info('{} concurent uploads will be used.'.format(value))

    confirm = raw_input(
        "\nProceed to upload files? Enter [Y/y] for yes: ").upper()
    if confirm == "Y":
        kill_received = False
        sys.stdout.write("\x1b[2J\x1b[H")
        q = CustomQueue()

        def worker():
        global kill_received
        while not kill_received:
                item = q.get()
                upload_file(item, file_quantity, retry_list, authenticate, q)
                q.task_done()

        for i in range(value):
            t = Thread(target=worker)
            t.setDaemon(True)
            t.start()

        for item in file_list:
            q.put(item)

        q.join()

        print "Finished. Cleaning up processes...",
        #Allowing the threads to cleanup
        time.sleep(4)
        print "done."


def upload_file(file_obj, file_quantity, retry_list, authenticate, q):
    """Uploads a file. One file per it's own thread. No batch style. This way if one upload
       fails no others are effected."""
    absolute_path_filename, filename, dir_name, token, url = file_obj
    url = url + dir_name + '/' + filename
    try:
        with open(absolute_path_filename) as f:  
           r = requests.put(url, data=f, headers=header_collection, timeout=20)
    except requests.exceptions.ConnectionError as e:
        pass

    if src_md5 == r.headers['etag']:
       lock.acquire()
       file_quantity.deduct()
       lock.release()

Upvotes: 0

Views: 301

Answers (1)

Tim Peters
Tim Peters

Reputation: 70735

Well, the code you posted doesn't define lock anywhere, so hard to say for sure. It would be more common to protect the code that actually needs protecting:

def deduct(self):
    with lock:
        self.quantity -= 1

Sanest is to allocate a lock in the structure that needs it, like so:

class FileQuantity(object):
    """Keeps track of files that have been uploaded and how many are left"""

    def __init__(self, file_quantity):
        self.quantity = file_quantity
        self.total = file_quantity
        self.lock = threading.Lock()

    def deduct(self):
        with self.lock:
            self.quantity -= 1

and use self.lock similarly for any other mutations of FileQuantity data members that may be invoked simultaneously by multiple threads.

Upvotes: 1

Related Questions