Wakan Tanka
Wakan Tanka

Reputation: 8052

writing to csv file using map_async

I have following code:

#!/usr/bin/env python

def do_job(row):
  # COMPUTING INTENSIVE OPERATION
  sleep(1)
  row.append(int(row[0])**2)

  # WRITING TO FILE - ATOMICITY ENSURED
  semaphore.acquire()
  print "Inside semaphore before writing to file: (%s,%s,%s)" % (row[0], row[1], row[2])
  csvWriter.writerow(row)
  print "Inside semaphore after writing to file"
  semaphore.release()

  # RETURNING VALUE
  return row

def parallel_csv_processing(inputFile, header=["Default", "header", "please", "change"], separator=",", skipRows = 0, cpuCount = 1):
  # OPEN FH FOR READING INPUT FILE
  inputFH   = open(inputFile,  "rb")
  csvReader = csv.reader(inputFH, delimiter=separator)

  # SKIP HEADERS
  for skip in xrange(skipRows):
    csvReader.next()

  # WRITE HEADER TO OUTPUT FILE
  csvWriter.writerow(header)

  # COMPUTING INTENSIVE OPERATIONS
  try:
    p = Pool(processes = cpuCount)
    # results = p.map(do_job, csvReader, chunksize = 10)
    results = p.map_async(do_job, csvReader, chunksize = 10)

  except KeyboardInterrupt:
    p.close()
    p.terminate()
    p.join()

  # WAIT FOR RESULTS
  # results.get()
  p.close()
  p.join()

  # CLOSE FH FOR READING INPUT
  inputFH.close()

if __name__ == '__main__':
  import csv
  from time import sleep
  from multiprocessing import Pool
  from multiprocessing import cpu_count
  from multiprocessing import current_process
  from multiprocessing import Semaphore
  from pprint import pprint as pp
  import calendar
  import time

  SCRIPT_START_TIME  = calendar.timegm(time.gmtime())
  inputFile  = "input.csv"
  outputFile = "output.csv"
  semaphore = Semaphore(1)

  # OPEN FH FOR WRITING OUTPUT FILE
  outputFH  = open(outputFile, "wt")
  csvWriter = csv.writer(outputFH, lineterminator='\n')
  csvWriter.writerow(["before","calling","multiprocessing"])
  parallel_csv_processing(inputFile, cpuCount = cpu_count())
  csvWriter.writerow(["after","calling","multiprocessing"])

  # CLOSE FH FOR WRITING OUTPUT
  outputFH.close()

  SCRIPT_STOP_TIME   = calendar.timegm(time.gmtime())
  SCRIPT_DURATION    = SCRIPT_STOP_TIME - SCRIPT_START_TIME
  print "Script duration:    %s seconds" % SCRIPT_DURATION

After running the output on terminal is following:

Inside semaphore before writing to file: (0,0,0)
Inside semaphore after writing to file
Inside semaphore before writing to file: (1,3,1)
Inside semaphore after writing to file
Inside semaphore before writing to file: (2,6,4)
Inside semaphore after writing to file
Inside semaphore before writing to file: (3,9,9)
Inside semaphore after writing to file
Inside semaphore before writing to file: (4,12,16)
Inside semaphore after writing to file
Inside semaphore before writing to file: (5,15,25)
Inside semaphore after writing to file
Inside semaphore before writing to file: (6,18,36)
Inside semaphore after writing to file
Inside semaphore before writing to file: (7,21,49)
Inside semaphore after writing to file
Inside semaphore before writing to file: (8,24,64)
Inside semaphore after writing to file
Inside semaphore before writing to file: (9,27,81)
Inside semaphore after writing to file
Script duration:    10 seconds

content of input.csv is following:

0,0
1,3
2,6
3,9
4,12
5,15
6,18
7,21
8,24
9,27

created content of output.csv is following:

before,calling,multiprocessing
Default,header,please,change
after,calling,multiprocessing

Why Is nothing written to output.csv from parallel_csv_processing resp. do_job method?

Upvotes: 0

Views: 1145

Answers (1)

Your processes are silently failing with an exception - specifically, in the spawned processes the script doesn't have a value for csvWriter because they are each in a separate python interpreter, and haven't run main() - this is deliberate, you don't want the subprocesses to run main. The do_job() function can only access values you pass to it explicitly in the map_async() call, and you aren't passing csvWriter. Even if you were I'm not sure it would work, don't know if file handles are shared between main and the processes created by multiprocessing.

Put a try/except around the code in do_job and you will see the exception.

def do_job(row):
  try:
      # COMPUTING INTENSIVE OPERATION
      sleep(1)
      row.append(int(row[0])**2)

      # WRITING TO FILE - ATOMICITY ENSURED
      semaphore.acquire()
      print "Inside semaphore before writing to file: (%s,%s,%s)" % (row[0], row[1], row[2])
      csvWriter.writerow(row)
      print "Inside semaphore after writing to file"
      semaphore.release()

      # RETURNING VALUE
      return row
  except:
      print "exception"

Obviously in real code the exception should be handled properly, but if you run this you'll now see exception printed for every invocation of do_job.

Look in the documentation for multiprocessing for more guidance - under the heading "16.6.1.4. Sharing state between processes" in the Python 2.7 Standard Library docs.

Upvotes: 1

Related Questions