Davoud Taghawi-Nejad
Davoud Taghawi-Nejad

Reputation: 16826

zeromq and python multiprocessing, too many open files

I have an agent-based model, where several agents are started by a central process and communicate via another central process. Every agent and the communication process communicate via zmq. However, when I start more than 100 agents standard_out sends:

Invalid argument (src/stream_engine.cpp:143) Too many open files (src/ipc_listener.cpp:292)

and Mac Os prompts a problem report :

Python quit unexpectedly while using the libzmq.5.dylib plug-in.

The problem appears to me that too many contexts are opened. But how can I avoid this with multiprocessing?

I attach part of the code below:

class Agent(Database, Logger, Trade, Messaging, multiprocessing.Process):
    def __init__(self, idn, group, _addresses, trade_logging):
        multiprocessing.Process.__init__(self)
        ....

    def run(self):
        self.context = zmq.Context()
        self.commands = self.context.socket(zmq.SUB)
        self.commands.connect(self._addresses['command_addresse'])
        self.commands.setsockopt(zmq.SUBSCRIBE, "all")
        self.commands.setsockopt(zmq.SUBSCRIBE, self.name)
        self.commands.setsockopt(zmq.SUBSCRIBE, group_address(self.group))

        self.out = self.context.socket(zmq.PUSH)
        self.out.connect(self._addresses['frontend'])
        time.sleep(0.1)
        self.database_connection = self.context.socket(zmq.PUSH)
        self.database_connection.connect(self._addresses['database'])
        time.sleep(0.1)
        self.logger_connection = self.context.socket(zmq.PUSH)
        self.logger_connection.connect(self._addresses['logger'])

        self.messages_in = self.context.socket(zmq.DEALER)
        self.messages_in.setsockopt(zmq.IDENTITY, self.name)
        self.messages_in.connect(self._addresses['backend'])

        self.shout = self.context.socket(zmq.SUB)
        self.shout.connect(self._addresses['group_backend'])
        self.shout.setsockopt(zmq.SUBSCRIBE, "all")
        self.shout.setsockopt(zmq.SUBSCRIBE, self.name)
        self.shout.setsockopt(zmq.SUBSCRIBE, group_address(self.group))

        self.out.send_multipart(['!', '!', 'register_agent', self.name])

        while True:
            try:
                self.commands.recv()  # catches the group adress.
            except KeyboardInterrupt:
                print('KeyboardInterrupt: %s,self.commands.recv() to catch own adress ~1888' % (self.name))
                break
            command = self.commands.recv()
            if command == "!":
                subcommand = self.commands.recv()
                if subcommand == 'die':
                    self.__signal_finished()
                    break
            try:
                self._methods[command]()
            except KeyError:
                if command not in self._methods:
                    raise SystemExit('The method - ' + command + ' - called in the agent_list is not declared (' + self.name)
                else:
                    raise
            except KeyboardInterrupt:
                print('KeyboardInterrupt: %s, Current command: %s ~1984' % (self.name, command))
                break

            if command[0] != '_':
                self.__reject_polled_but_not_accepted_offers()
                self.__signal_finished()
        #self.context.destroy()

the whole code is under http://www.github.com/DavoudTaghawiNejad/abce

Upvotes: 10

Views: 11197

Answers (3)

Kessem Lee
Kessem Lee

Reputation: 1373

You should increase the number of allowed open files for the process as in this question: Python: Which command increases the number of open files on Windows?

the default per process is 512

import win32file
print win32file._getmaxstdio() #512

win32file._setmaxstdio(1024)
print win32file._getmaxstdio() #1024

Upvotes: 0

kelunik
kelunik

Reputation: 6928

This has nothing to do with zeromq nor python. It's the underlying operating system, that allow only up to a certain threshold of concurrently opened files. This limit includes normal files, but also socket connections.

You can see your current limit using ulimit -n, it will probably default to 1024. Machines running servers or having other reasons (like your multiprocessing) often require to set this limit higher or just to unlimited. – More info about ulimit.

Additionally, there's another global limit, however it's nothing I had to adjust yet.

In general, you should ask yourself, if you really need that many agents. Usually, X / 2X worker processes should be enough, where X corresponds to your CPU count.

Upvotes: 9

Jason
Jason

Reputation: 13766

Odds are it's not too many contexts, it's too many sockets. Looking through your repo, I see you're (correctly) using IPC as your transport; IPC uses a file descriptor as the "address" to pass data back and forth between different processes. If I'm reading correctly, you're opening up to 7 sockets per process, so that'll add up quickly. I'm betting that if you do some debugging in the middle of your code, you'll see that it doesn't fail when the last context is created, but when the last socket pushes the open file limit over the edge.

My understanding is that the typical user limit for open FDs is around 1000, so at around 100 agents you're pushing 700 open FDs just for your sockets. The remainder is probably just typical. There should be no problem increasing your limit up to 10,000, higher depending on your situation. Otherwise you'll have to rewrite to use less sockets per process to get a higher process limit.

Upvotes: 9

Related Questions