Reputation: 384
This is for Python 3.x
I'm loading records from a CSV file in chunks of 300, then spawning worker threads to submit them to a REST API. I'm saving the HTTP response in a Queue, so that I can get a count for the number of skipped records once the entire CSV file is processed. However, after I added a Queue to my worker, the threads don't seem to close anymore. I want to monitor the number of thread for 2 reasons: (1) once all are done, I can calculate and display the skip counts and (2) I want to enhance my script to spawn no more than 20 or so threads, so I don't run out of memory.
I have 2 questions:
q.put()
?Here is my code (somewhat simplified, because I can't share the exact details of the API I'm calling):
import requests, json, csv, time, datetime, multiprocessing
TEST_FILE = 'file.csv'
def read_test_data(path, chunksize=300):
leads = []
with open(path, 'rU') as data:
reader = csv.DictReader(data)
for index, row in enumerate(reader):
if (index % chunksize == 0 and index > 0):
yield leads
del leads[:]
leads.append(row)
yield leads
def worker(leads, q):
payload = {"action":"createOrUpdate","input":leads}
r = requests.post(url, params=params, data=json.dumps(payload), headers=headers)
q.put(r.text) # this puts the response in a queue for later analysis
return
if __name__ == "__main__":
q = multiprocessing.Queue() # this is a queue to put all HTTP responses in, so we count the skips
jobs = []
for leads in read_test_data(TEST_FILE): # This function reads a CSV file and provides 300 records at a time
p = multiprocessing.Process(target=worker, args=(leads,q,))
jobs.append(p)
p.start()
time.sleep(20) # checking if processes are closing automatically (they don't)
print(len(multiprocessing.active_children())) ## always returns the number of threads. If I remove 'q.put' from worker, it returns 0
# The intent is to wait until all workers are done, but it results in an infinite loop
# when I remove 'q.put' in the worker it works fine
#while len(multiprocessing.active_children()) > 0: #
# time.sleep(1)
skipped_count = 0
while not q.empty(): # calculate number of skipped records based on the HTTP responses in the queue
http_response = json.loads(q.get())
for i in http_response['result']:
if (i['status'] == "skipped" and i['reasons'][0]['code'] == "1004"):
skipped_count += 1
print("Number of records skipped: " + str(skipped_count))
Upvotes: 3
Views: 5681
Reputation: 94881
This is most likely because of this documented quirk of multiprocessing.Queue
:
Bear in mind that a process that has put items in a queue will wait before terminating until all the buffered items are fed by the “feeder” thread to the underlying pipe. (The child process can call the
cancel_join_thread()
method of the queue to avoid this behaviour.)This means that whenever you use a queue you need to make sure that all items which have been put on the queue will eventually be removed before the process is joined. Otherwise you cannot be sure that processes which have put items on the queue will terminate. Remember also that non-daemonic processes will be joined automatically.
Basically, you need to make sure you get()
all the items from a Queue
to guarantee that all the processes which put
something into that Queue
will be able to exit.
I think in this case you're better off using a multiprocessing.Pool
, and submitting all your jobs to multiprocessing.Pool.map
. This simplifies things significantly, and gives you complete control over the number of processes running:
def worker(leads):
payload = {"action":"createOrUpdate","input":leads}
r = requests.post(url, params=params, data=json.dumps(payload), headers=headers)
return r.text
if __name__ == "__main__":
pool = multiprocessing.Pool(multiprocessing.cpu_count() * 2) # cpu_count() * 2 processes running in the pool
responses = pool.map(worker, read_test_data(TEST_FILE))
skipped_count = 0
for raw_response in responses:
http_response = json.loads(raw_response)
for i in http_response['result']:
if (i['status'] == "skipped" and i['reasons'][0]['code'] == "1004"):
skipped_count += 1
print("Number of records skipped: " + str(skipped_count))
If you're worried about the memory cost of converting read_test_data(TEST_FILE)
into a list (which is required to use Pool.map
), you can use Pool.imap
instead.
Edit:
As I mentioned in a comment above, this use-case looks like it's I/O-bound, which means you may see better performance by using a multiprocessing.dummy.Pool
(which uses a thread pool instead of a process pool). Give both a try and see which is faster.
Upvotes: 9