Reputation: 8052
I have following code:
#!/usr/bin/env python
def do_job(row):
# COMPUTING INTENSIVE OPERATION
sleep(1)
row.append(int(row[0])**2)
# WRITING TO FILE - ATOMICITY ENSURED
semaphore.acquire()
print "Inside semaphore before writing to file: (%s,%s,%s)" % (row[0], row[1], row[2])
csvWriter.writerow(row)
print "Inside semaphore after writing to file"
semaphore.release()
# RETURNING VALUE
return row
def parallel_csv_processing(inputFile, header=["Default", "header", "please", "change"], separator=",", skipRows = 0, cpuCount = 1):
# OPEN FH FOR READING INPUT FILE
inputFH = open(inputFile, "rb")
csvReader = csv.reader(inputFH, delimiter=separator)
# SKIP HEADERS
for skip in xrange(skipRows):
csvReader.next()
# WRITE HEADER TO OUTPUT FILE
csvWriter.writerow(header)
# COMPUTING INTENSIVE OPERATIONS
try:
p = Pool(processes = cpuCount)
# results = p.map(do_job, csvReader, chunksize = 10)
results = p.map_async(do_job, csvReader, chunksize = 10)
except KeyboardInterrupt:
p.close()
p.terminate()
p.join()
# WAIT FOR RESULTS
# results.get()
p.close()
p.join()
# CLOSE FH FOR READING INPUT
inputFH.close()
if __name__ == '__main__':
import csv
from time import sleep
from multiprocessing import Pool
from multiprocessing import cpu_count
from multiprocessing import current_process
from multiprocessing import Semaphore
from pprint import pprint as pp
import calendar
import time
SCRIPT_START_TIME = calendar.timegm(time.gmtime())
inputFile = "input.csv"
outputFile = "output.csv"
semaphore = Semaphore(1)
# OPEN FH FOR WRITING OUTPUT FILE
outputFH = open(outputFile, "wt")
csvWriter = csv.writer(outputFH, lineterminator='\n')
csvWriter.writerow(["before","calling","multiprocessing"])
parallel_csv_processing(inputFile, cpuCount = cpu_count())
csvWriter.writerow(["after","calling","multiprocessing"])
# CLOSE FH FOR WRITING OUTPUT
outputFH.close()
SCRIPT_STOP_TIME = calendar.timegm(time.gmtime())
SCRIPT_DURATION = SCRIPT_STOP_TIME - SCRIPT_START_TIME
print "Script duration: %s seconds" % SCRIPT_DURATION
After running the output on terminal is following:
Inside semaphore before writing to file: (0,0,0)
Inside semaphore after writing to file
Inside semaphore before writing to file: (1,3,1)
Inside semaphore after writing to file
Inside semaphore before writing to file: (2,6,4)
Inside semaphore after writing to file
Inside semaphore before writing to file: (3,9,9)
Inside semaphore after writing to file
Inside semaphore before writing to file: (4,12,16)
Inside semaphore after writing to file
Inside semaphore before writing to file: (5,15,25)
Inside semaphore after writing to file
Inside semaphore before writing to file: (6,18,36)
Inside semaphore after writing to file
Inside semaphore before writing to file: (7,21,49)
Inside semaphore after writing to file
Inside semaphore before writing to file: (8,24,64)
Inside semaphore after writing to file
Inside semaphore before writing to file: (9,27,81)
Inside semaphore after writing to file
Script duration: 10 seconds
content of input.csv
is following:
0,0
1,3
2,6
3,9
4,12
5,15
6,18
7,21
8,24
9,27
created content of output.csv
is following:
before,calling,multiprocessing
Default,header,please,change
after,calling,multiprocessing
Why Is nothing written to output.csv
from parallel_csv_processing
resp. do_job
method?
Upvotes: 0
Views: 1145
Reputation: 6826
Your processes are silently failing with an exception - specifically, in the spawned processes the script doesn't have a value for csvWriter because they are each in a separate python interpreter, and haven't run main() - this is deliberate, you don't want the subprocesses to run main. The do_job() function can only access values you pass to it explicitly in the map_async() call, and you aren't passing csvWriter. Even if you were I'm not sure it would work, don't know if file handles are shared between main and the processes created by multiprocessing.
Put a try/except around the code in do_job and you will see the exception.
def do_job(row):
try:
# COMPUTING INTENSIVE OPERATION
sleep(1)
row.append(int(row[0])**2)
# WRITING TO FILE - ATOMICITY ENSURED
semaphore.acquire()
print "Inside semaphore before writing to file: (%s,%s,%s)" % (row[0], row[1], row[2])
csvWriter.writerow(row)
print "Inside semaphore after writing to file"
semaphore.release()
# RETURNING VALUE
return row
except:
print "exception"
Obviously in real code the exception should be handled properly, but if you run this you'll now see exception printed for every invocation of do_job.
Look in the documentation for multiprocessing for more guidance - under the heading "16.6.1.4. Sharing state between processes" in the Python 2.7 Standard Library docs.
Upvotes: 1