George Sovetov
George Sovetov

Reputation: 5238

Pure python bytes pipe

How do I make an two objects connected to one circular (preferably dynamically resized) buffer which can be treated as two sides of pipe or socket? I.e. one object can read from some fifo buffer and close read side, other object can write there and close write side. I don't need any poll/select functionality.

One process, one thread. No IPC or synchronization involved. (This has already been implemented separately.)

They should serve as adapters for data coming from other sources which are not file-like objects but can be represented as streams.

If I wrote it with my hands, I would feel like I invented a wheel.

Maybe some combination of classes for io module can do the trick. Out there is cross-platform OS-level fifo/pipe objects?

Solution has to be memory-efficient.

Upvotes: 1

Views: 426

Answers (2)

George Sovetov
George Sovetov

Reputation: 5238

True cross-platform pipe with real file handles that can be used even with subprocess is multipricessing.Pipe and use its file descriptor:

import multiprocessing 
import os

conn_out, conn_in = multiprocessing.Pipe(duplex=False)
os.write(conn_in.fileno(), b'Hello.')
print(os.read(conn_out.fileno(), 10))

Note that methods multipricessing.Pipe.send and multipricessing.Pipe.recv operate with bytes and methods multipricessing.Pipe.send_bytes and multipricessing.Pipe.recv_bytes add headers.

Upvotes: 1

George Sovetov
George Sovetov

Reputation: 5238

Not very efficient, but here it is.

from collections import deque


class AlreadyClosed(Exception):
    pass


class NothingToRead(Exception):
    pass


def make_pipe():
    queue = deque()

    class Reader(object):
        closed = False

        @staticmethod
        def read():
            if Reader.closed:
                raise AlreadyClosed()
            if Writer.closed:
                return b''
            try:
                return queue.popleft()
            except IndexError:
                raise NothingToRead()

    class Writer(object):
        closed = False

        @staticmethod
        def write(chunk):
            if Writer.closed:
                raise AlreadyClosed()
            if Reader.closed:
                return 0
            queue.append(chunk)
            return len(chunk)

    return Reader, Writer


if __name__ == '__main__':
    r, w = make_pipe()
    w.write(b'qwe')
    w.write(b'asd')
    print(r.read())
    print(r.read())
    try:
        print(r.read())
    except NothingToRead as e:
        print(repr(e))

Upvotes: 0

Related Questions