jldupont
jldupont

Reputation: 96884

zeromq zmq.Poller & stdin

Is it possible to use zmq.Poller to also poll for data availability on stdin? If not, what would be the most efficient wait to poll, at the some time (ideally), for data availability on zeromq sockets & stdin?

Upvotes: 5

Views: 2106

Answers (2)

André Caron
André Caron

Reputation: 45304

If you are sure you will never run on Windows, you can simply register sys.stdin with a zmq.Poller (as described by minrk, above).

However, the select() implementation in Winsock only supports sockets and cannot be used to poll "regular" file descriptors like the standard input. Therefore, when running on Windows, you need to bridge the standard input with a 0MQ socket on the inproc transport.

Suggested implementation using an exclusive pair socket:

def forward_lines(stream, socket):
    """Read lines from `stream` and send them over `socket`."""
    try:
        line = stream.readline()
        while line:
            socket.send(line[:-1])
            line = stream.readline()
        socket.send('')  # send "eof message".
    finally:
        # NOTE: `zmq.Context.term()` in the main thread will block until this
        #       socket is closed, so we can't run this function in daemon
        #       thread hoping that it will just close itself.
        socket.close()


def forward_standard_input(context):
    """Start a thread that will bridge the standard input to a 0MQ socket and
    return an exclusive pair socket from which you can read lines retrieved
    from the standard input.  You will receive a final empty line when the EOF
    character is input to the keyboard."""
    reader = context.socket(zmq.PAIR)
    reader.connect('inproc://standard-input')
    writer = context.socket(zmq.PAIR)
    writer.bind('inproc://standard-input')
    thread = threading.Thread(target=forward_lines,
                              args=(sys.stdin, writer))
    thread.start()
    return reader


if __name__ == '__main__':
    context = zmq.Context()
    reader = forward_standard_input(context)
    poller = zmq.Poller()
    poller.register(reader, zmq.POLLIN)
    poller.register(...)

    events = dict(poller.poll())
    if events.get(reader, 0) & zmq.POLLIN:
        line = reader.recv()
        # process line.
    if events.get(..., 0) & zmq.POLLIN:
        # ...

Upvotes: 1

minrk
minrk

Reputation: 38668

yes, zmq pollers do support native FDs, including stdin, etc., so you just need to check sys.stdin.fileno():

poller = zmq.Poller()
poller.register(sys.stdin, zmq.POLLIN)
poller.register(mysocket, zmq.POLLIN)
evts = dict(poller.poll(1000))
stdin_ready = evts.get(sys.stdin.fileno(), False)
socket_ready = evts.get(mysocket, False)

Upvotes: 4

Related Questions