user124114
user124114

Reputation: 8692

How to use multiprocessing together with multithreading?

CherryPy server uses threads to handle requests. One particular method in my threaded server is very complex and CPU heavy, so I have to use multiprocessing, from inside the method request thread, to parallelize execution.

I thought I'd just replace

class Server(object)
    @cherrypy.expose
    def expensive_method(self):
        ...
        x = map(fnc, args)
        ...

def fnc(args):
    # this method doesn't need CherryPy but is expensive to compute
    ...

cherrypy.quickstart(Server())

(which works fine) with

    def expensive_method(self):
        pool = Pool()
        x = pool.map(fnc, args)
        pool.terminate()

but that doesn't work. Even in the simpler case, when I don't use the pool at all,

    def expensive_method(self):
        pool = Pool()
        x = map(fnc, args) # <== no pool here! same as the working example
        pool.terminate()

I get an exception

[08/Jan/2013:20:05:33] ENGINE Caught signal SIGTERM.
2013-01-08 20:05:33,919 : INFO : _cplogging:201 : error(CP Server Thread-3) : [08/Jan/2013:20:05:33] ENGINE Caught signal SIGTERM.
[08/Jan/2013:20:05:33] ENGINE Bus STOPPING
2013-01-08 20:05:33,920 : INFO : _cplogging:201 : error(CP Server Thread-3) : [08/Jan/2013:20:05:33] ENGINE Bus STOPPING
[08/Jan/2013:20:05:38] ENGINE Error in 'stop' listener <bound method Server.stop of <cherrypy._cpserver.Server object at 0x1090c3c90>>
Traceback (most recent call last):
  File "/Volumes/work/workspace/vew/prj/lib/python2.7/site-packages/cherrypy/process/wspbus.py", line 197, in publish
    output.append(listener(*args, **kwargs))
  File "/Volumes/work/workspace/vew/prj/lib/python2.7/site-packages/cherrypy/process/servers.py", line 223, in stop
    wait_for_free_port(*self.bind_addr)
  File "/Volumes/work/workspace/vew/prj/lib/python2.7/site-packages/cherrypy/process/servers.py", line 410, in wait_for_free_port
    raise IOError("Port %r not free on %r" % (port, host))
IOError: Port 8888 not free on '127.0.0.1'

I think this happens at the end of the request, either after or during pool.terminate().

The forked worker processes don't do anything with the server or ports as such. Is there any way to tell CherryPy and/or multiprocessing to ignore the "server bits"? I don't need any ports or sockets there in fnc.

I need this to work on OSX + Linux, using Python 2.7.1 and CherryPy 3.2.2.


PROGRESS 1:

As per Sylvain's suggestion, I tried pool = Pool(initializer=cherrypy.server.unsubscribe). There are no more exceptions, everything works fine, but in the log I see

[08/Jan/2013:21:16:35] ENGINE Caught signal SIGTERM.
2013-01-08 21:16:35,908 : INFO : _cplogging:201 : error(CP Server Thread-10) : [08/Jan/2013:21:16:35] ENGINE Caught signal SIGTERM.
[08/Jan/2013:21:16:35] ENGINE Bus STOPPING
2013-01-08 21:16:35,909 : INFO : _cplogging:201 : error(CP Server Thread-10) : [08/Jan/2013:21:16:35] ENGINE Bus STOPPING
[08/Jan/2013:21:16:35] ENGINE Bus STOPPED
2013-01-08 21:16:35,909 : INFO : _cplogging:201 : error(CP Server Thread-10) : [08/Jan/2013:21:16:35] ENGINE Bus STOPPED
[08/Jan/2013:21:16:35] ENGINE Bus EXITING
2013-01-08 21:16:35,909 : INFO : _cplogging:201 : error(CP Server Thread-10) : [08/Jan/2013:21:16:35] ENGINE Bus EXITING
[08/Jan/2013:21:16:35] ENGINE Bus EXITED
2013-01-08 21:16:35,910 : INFO : _cplogging:201 : error(CP Server Thread-10) : [08/Jan/2013:21:16:35] ENGINE Bus EXITED

Is that alright? Could any trouble come from this (say, when serving multiple requests in different threads concurrently)?


PROGRESS 2:

Actually the above leaves idle processes behind occasionally :( So it doesn't work fine. Strange thing is, these idle processes were spawned by Pool, so they ought be daemons, but they actually stay alive even after killing the parent.


PROGRESS 3:

I moved the forking (=Pool() call) outside of the request handling method, but after initializing all the necessary state (so that the worker processes can see this state). No more errors or exceptions.

Bottom line: multiprocessing and multithreading don't work together.

Upvotes: 4

Views: 2178

Answers (1)

Sylvain Hellegouarch
Sylvain Hellegouarch

Reputation: 851

Which type of object does 'self' refer to? At which point do you initialize and start your forked process? Perhaps a little more code would help diagnose the issue.

Okay this works just fine:

import multiprocessing
import os
import time

import cherrypy

def run_in_sub_proc(size):
    for i in range(size):
        print os.getpid(), i
        time.sleep(1)

pool = multiprocessing.Pool(2)

class Root(object):
    @cherrypy.expose
    def index(self):
        pool.map_async(run_in_sub_proc, (3, 5))

if __name__ == '__main__':
    cherrypy.engine.subscribe('stop', pool.join)
    cherrypy.quickstart(Root())

Upvotes: 4

Related Questions