Reputation: 11073
Using Linux and Python 2.7.6, I have a script that uploads lots of files at one time. I am using multi-threading with the Queue and Threading modules.
I have a object that keeps track of the files that have been successfully uploaded and decrements after each successfull upload. I need to make this operation atomic/thread safe. Since the Queue module is high level and has it's own mutex on the lower level, can I impose my own lock/acquire in addition to it? I tried doing this and had no errors(at the bottom of the last code block where file_quantity.deduct()
is). But I am not sure if it is truly working as it should. Here is the shortened version for readability:
class FileQuantity(object):
"""Keeps track of files that have been uploaded and how many are left"""
def __init__(self, file_quantity):
self.quantity = file_quantity
self.total = file_quantity
def deduct(self):
self.quantity -= 1
kill_received = False
lock = threading.Lock()
class CustomQueue(Queue.Queue):
#Can not use .join() because it would block any processing
#for SIGINT untill threads are done. To counter this,
# wait() is given a time out along with while not kill_received
#to be checked
def join(self):
self.all_tasks_done.acquire()
try:
while not kill_received and self.unfinished_tasks:
self.all_tasks_done.wait(10.0)
finally:
self.all_tasks_done.release()
def do_the_uploads(file_list, file_quantity,
retry_list, authenticate):
"""The uploading engine"""
value = raw_input(
"\nPlease enter how many concurent "
"uploads you want at one time(example: 200)> ")
value = int(value)
logger.info('{} concurent uploads will be used.'.format(value))
confirm = raw_input(
"\nProceed to upload files? Enter [Y/y] for yes: ").upper()
if confirm == "Y":
kill_received = False
sys.stdout.write("\x1b[2J\x1b[H")
q = CustomQueue()
def worker():
global kill_received
while not kill_received:
item = q.get()
upload_file(item, file_quantity, retry_list, authenticate, q)
q.task_done()
for i in range(value):
t = Thread(target=worker)
t.setDaemon(True)
t.start()
for item in file_list:
q.put(item)
q.join()
print "Finished. Cleaning up processes...",
#Allowing the threads to cleanup
time.sleep(4)
print "done."
def upload_file(file_obj, file_quantity, retry_list, authenticate, q):
"""Uploads a file. One file per it's own thread. No batch style. This way if one upload
fails no others are effected."""
absolute_path_filename, filename, dir_name, token, url = file_obj
url = url + dir_name + '/' + filename
try:
with open(absolute_path_filename) as f:
r = requests.put(url, data=f, headers=header_collection, timeout=20)
except requests.exceptions.ConnectionError as e:
pass
if src_md5 == r.headers['etag']:
lock.acquire()
file_quantity.deduct()
lock.release()
Upvotes: 0
Views: 301
Reputation: 70735
Well, the code you posted doesn't define lock
anywhere, so hard to say for sure. It would be more common to protect the code that actually needs protecting:
def deduct(self):
with lock:
self.quantity -= 1
Sanest is to allocate a lock in the structure that needs it, like so:
class FileQuantity(object):
"""Keeps track of files that have been uploaded and how many are left"""
def __init__(self, file_quantity):
self.quantity = file_quantity
self.total = file_quantity
self.lock = threading.Lock()
def deduct(self):
with self.lock:
self.quantity -= 1
and use self.lock
similarly for any other mutations of FileQuantity
data members that may be invoked simultaneously by multiple threads.
Upvotes: 1