Reputation: 6352
I have an application that uses both grequests
and multiprocessing.managers
for a combination of IPC communication and Asynchronous RESTful communications over HTTP.
It seems that grequests
, in using gevent.monkey
's patch_all()
method, breaks the multiprocessing.connection
module used by the multiprocessing.manager.SyncManager
class and its derivatives.
This is apparently not an isolated issue, but affects any use case that implements multiprocessing.connetion
, such as multiprocessing.pool
, for example.
Drilling down into the code in gevent/monkey.py
, I found that the swapping of the stdlib socket
module with gevent.socket
is what causes the breakage.
This can be found at line 115 in gevent/monkey.py
under the patch_socket()
function:
def patch_socket(dns=True, aggressive=True):
"""Replace the standard socket object with gevent's cooperative sockets.
...
_socket.socket = socket.socket # This line breaks multiprocessing.connection!
...
My question is then why does this swappage break multiprocessing.connection
, and what advantages are derived from using gevent.socket
instead of the stdlib's socket
module? That is, what performance loss, if any, will I incur from not patching the socket
module?
Traceback (most recent call last):
File "clientWithGeventMonkeyPatch.py", line 49, in <module>
client = GetClient(host, port, authkey)
File "clientWithGeventMonkeyPatch.py", line 39, in GetClient
client.connect()
File "/usr/lib/python2.7/multiprocessing/managers.py", line 500, in connect
conn = Client(self._address, authkey=self._authkey)
File "/usr/lib/python2.7/multiprocessing/connection.py", line 175, in Client
answer_challenge(c, authkey)
File "/usr/lib/python2.7/multiprocessing/connection.py", line 414, in answer_challenge
response = connection.recv_bytes(256) # reject large message
IOError: [Errno 11] Resource temporarily unavailable
(on ubuntu server 11.10, python2.7.3, with gevent, greenlet, and grequests installed)
manager.py
## manager.py
import multiprocessing
import multiprocessing.managers
import datetime
class LocalManager(multiprocessing.managers.SyncManager):
def __init__(self, *args, **kwargs):
multiprocessing.managers.SyncManager.__init__(self, *args, **kwargs)
self.__type__ = 'LocalManager'
def GetManager(host, port, authkey):
def getdatetime():
return '{}'.format(datetime.datetime.now())
LocalManager.register('getdatetime', callable = getdatetime)
manager = LocalManager(address = (host, port), authkey = authkey)
manager.start()
return manager
if __name__ == '__main__':
# define our manager connection parameters
port = 55555
host = 'localhost'
authkey = 'auth1234'
# start a manager
man = GetManager(host, port, authkey)
# wait for user input to shut down
raw_input('return to shutdown')
man.shutdown()
client.py
## client.py -- this one works
import time
import multiprocessing.managers
class RemoteClient(multiprocessing.managers.SyncManager):
def __init__(self, *args, **kwargs):
multiprocessing.managers.SyncManager.__init__(self, *args, **kwargs)
self.__type__ = 'RemoteClient'
def GetClient(host, port, authkey):
RemoteClient.register('getdatetime')
client = RemoteClient(address = (host, port), authkey = authkey)
client.connect()
return client
if __name__ == '__main__':
# define our client connection parameters
port = 55555
host = 'localhost'
authkey = 'auth1234'
# start a manager
client = GetClient(host, port, authkey)
print 'connected', client
print 'client.getdatetime()', client.getdatetime()
# wait a couple of seconds, then do it again
time.sleep(2)
print 'client.getdatetime()', client.getdatetime()
# exit...
clientWithGeventMonkeyPatch.py
## clientWithGeventMonkeyPatch.py -- breaks, depending on patch_all() parameters
import time
import multiprocessing.managers
# this part is copied from grequests
# bear in mind that it doesn't actually do anything in this module.
try:
import gevent
from gevent import monkey as curious_george
from gevent.pool import Pool
except ImportError:
raise RuntimeError('Gevent is required for grequests.')
# this line causes breakage of the multiprocessing.manager connection auth method:
# Monkey-patch.
# patch_all() parameters with default values: socket=True, dns=True, time=True, select=True, thread=True, os=True, ssl=True, aggressive=True
curious_george.patch_all(thread=False, select=False) # breaks
#~ curious_george.patch_all(thread=False, select=False, socket = False) # works!
#~ curious_george.patch_all(thread=False, select=False, socket = True, aggressive = True, dns = True) # same as (thread=False, select=False); breaks
#~ curious_george.patch_all(thread=False, select=False, socket = True, aggressive = True, dns = False) # breaks
#~ curious_george.patch_all(thread=False, select=False, socket = True, aggressive = False, dns = True) # breaks
#~ curious_george.patch_all(thread=False, select=False, socket = True, aggressive = False, dns = False) # breaks
class RemoteClient(multiprocessing.managers.SyncManager):
def __init__(self, *args, **kwargs):
multiprocessing.managers.SyncManager.__init__(self, *args, **kwargs)
self.__type__ = 'RemoteClient'
def GetClient(host, port, authkey):
RemoteClient.register('getdatetime')
client = RemoteClient(address = (host, port), authkey = authkey)
client.connect()
return client
if __name__ == '__main__':
# define our client connection parameters
port = 55555
host = 'localhost'
authkey = 'auth1234'
# start a manager
client = GetClient(host, port, authkey)
print 'connected', client
print 'client.getdatetime()', client.getdatetime()
# wait a couple of seconds, then do it again
time.sleep(2)
print 'client.getdatetime()', client.getdatetime()
# exit...
Upvotes: 5
Views: 2682
Reputation: 527053
If you don't patch the socket module, gevent
's ability to not block on network operations won't be available, and thus most of the benefit of using gevent
in the first place won't be available.
gevent
and multiprocessing
aren't really designed to play nicely with one another - gevent
mostly assumes that you're doing your network connections through it, and not bypassing the highest level Python socket interfaces (which multiprocessing
does).
Upvotes: 7