GIS-Jonathan
GIS-Jonathan

Reputation: 4647

Convert a multi-threaded Python to a multi-process one using concurrent futures

I have the following working code (Python 3.5) which uses concurrent futures to parse files in a threaded manner, and then do some post-processing on the results when they come back (in any order).

from concurrent import futures
with futures.ThreadPoolExecutor(max_workers=4) as executor:
    # A dictionary which will contain a list the future info in the key, and the filename in the value
    jobs = {}

    # Loop through the files, and run the parse function for each file, sending the file-name to it, along with the kwargs of parser_variables.
    # The results of the functions can come back in any order.
    for this_file in files_list:
        job = executor.submit(parse_log_file.parse, this_file, **parser_variables)
        jobs[job] = this_file

    # Get the completed jobs whenever they are done
    for job in futures.as_completed(jobs):
        debug.checkpointer("Multi-threaded Parsing File finishing")

        # Send the result of the file the job is based on (jobs[job]) and the job (job.result)
        result_content = job.result()
        this_file = jobs[job]

I want to convert this to use processes instead of threads because threads don't offer any speedup. In theory I just need to change ThreadPoolExecutor into ProcessPoolExecutor. The problem is, if I do that I get this exception:

Process Process-2:
Traceback (most recent call last):
  File "C:\Python35\lib\multiprocessing\process.py", line 254, in _bootstrap
    self.run()
  File "C:\Python35\lib\multiprocessing\process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Python35\lib\concurrent\futures\process.py", line 169, in _process_worker
    call_item = call_queue.get(block=True)
  File "C:\Python35\lib\multiprocessing\queues.py", line 113, in get
    return ForkingPickler.loads(res)
TypeError: Required argument 'fileno' (pos 1) not found
Traceback (most recent call last):
  File "c:/myscript/main.py", line 89, in <module>
    main()
  File "c:/myscript/main.py", line 59, in main
    system_counters = process_system(system, filename)
  File "c:\myscript\per_system.py", line 208, in process_system
    system_counters = process_filelist(**file_handling_variables)
  File "c:\myscript\per_logfile.py", line 31, in process_filelist
    results_list = job.result()
  File "C:\Python35\lib\concurrent\futures\_base.py", line 398, in result
    return self.__get_result()
  File "C:\Python35\lib\concurrent\futures\_base.py", line 357, in __get_result
    raise self._exception
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

I think that this might have something to do with pickling, but googling for the error hasn't found anything.

How do I convert the above to use multiple processes?

Upvotes: 1

Views: 497

Answers (1)

GIS-Jonathan
GIS-Jonathan

Reputation: 4647

It turns out this is because one of the things I'm passing inside parser_variables is a class (a reader from a third-party module). If I remove the class, the above works fine. For whatever reason, pickle doesn't seem to be able to handle this particular object.

Upvotes: 1

Related Questions