Reputation: 11508
I'm new to python and I'm having trouble understanding how threading works. By skimming through the documentation, my understanding is that calling join()
on a thread is the recommended way of blocking until it completes.
To give a bit of background, I have 48 large csv files (multiple GB) which I am trying to parse in order to find inconsistencies. The threads share no state. This can be done single threadedly in a reasonable ammount of time for a one-off, but I am trying to do it concurrently as an exercise.
Here's a skeleton of the file processing:
def process_file(data_file):
with open(data_file) as f:
print "Start processing {0}".format(data_file)
line = f.readline()
while line:
# logic omitted for brevity; can post if required
# pretty certain it works as expected, single 'thread' works fine
line = f.readline()
print "Finished processing file {0} with {1} errors".format(data_file, error_count)
def process_file_callable(data_file):
try:
process_file(data_file)
except:
print >> sys.stderr, "Error processing file {0}".format(data_file)
And the concurrent bit:
def partition_list(l, n):
""" Yield successive n-sized partitions from a list.
"""
for i in xrange(0, len(l), n):
yield l[i:i+n]
partitions = list(partition_list(data_files, 4))
for partition in partitions:
threads = []
for data_file in partition:
print "Processing file {0}".format(data_file)
t = Thread(name=data_file, target=process_file_callable, args = (data_file,))
threads.append(t)
t.start()
for t in threads:
print "Joining {0}".format(t.getName())
t.join(5)
print "Joined the first chunk of {0}".format(map(lambda t: t.getName(), threads))
I run this as:
python -u datautils/cleaner.py > cleaner.out 2> cleaner.err
My understanding is that join() should block the calling thread waiting for the thread it's called on to finish, however the behaviour I'm observing is inconsistent with my expectation.
I never see errors in the error file, but I also never see the expected log messages on stdout.
The parent process does not terminate unless I explicitly kill it from the shell. If I check how many prints I have for Finished ...
it's never the expected 48, but somewhere between 12 and 15. However, having run this single-threadedly, I can confirm that the multithreaded run is actually processing everything and doing all the expected validation, only it does not seem to terminate cleanly.
I know I must be doing something wrong, but I would really appreciate if you can point me in the right direction.
Upvotes: 3
Views: 4634
Reputation: 11508
Thanks everybody for your input and sorry for not replying sooner - I'm working on this on and off as a hobby project.
I've managed to write a simple example that proves it was my bad:
from itertools import groupby
from threading import Thread
from random import randint
from time import sleep
for key, partition in groupby(range(1, 50), lambda k: k//10):
threads = []
for idx in list(partition):
thread_name = 'thread-%d' % idx
t = Thread(name=thread_name, target=sleep, args=(randint(1, 5),))
threads.append(t)
print 'Starting %s' % t.getName()
t.start()
for t in threads:
print 'Joining %s' % t.getName()
t.join()
print 'Joined the first group of %s' % map(lambda t: t.getName(), threads)
The reason it was failing initially was the while loop the 'logic omitted for brevity' was working fine, however some of the input files that were being fed in were corrupted (had jumbled lines) and the logic went into an infinite loop on them. This is the reason some threads were never joined. The timeout for the join made sure that they were all started, but some never finished hence the inconsistency between 'starting' and 'joining'. The other fun fact was that the corruption was on the last line, so all the expected data was being processed.
Thanks again for your advice - the comment about processing files in a while
instead of the pythonic way pointed me in the right direction, and yes, threading behaves as expected.
Upvotes: 4
Reputation: 3919
I can't understand where mistake in your code. But I can recommend you to refactor it a little bit. First at all, threading in python is not concurrent at all. It's just illusion, because there is a Global Interpreter Lock, so only one thread can be executed in same time. That's why I recommend you to use multiprocessing module:
from multiprocessing import Pool, cpu_count
pool = Pool(cpu_count)
for partition in partition_list(data_files, 4):
res = pool.map(process_file_callable, partition)
print res
At second, you are using not pythonic way to read file:
with open(...) as f:
line = f.readline()
while line:
... # do(line)
line = f.readline()
Here is pythonic way:
with open(...) as f:
for line in f:
... # do(line)
This is memory efficient, fast, and leads to simple code. (c) PyDoc
By the way, I have only one hypothesis what can happen with your program in multithreading way - app became more slower, because unordered access to hard disk drive is significantly slower than ordered. You can try to check this hypothesis using iostat
or htop
, if you are using Linux.
If your app does not finish work, and it doesn't do anything in process monitor (cpu or disk is not active), it means you have some kind of deadlock or blocked access to same resource.
Upvotes: 4