Jan Špaček
Jan Špaček

Reputation: 1108

Pump bytes from asyncio StreamReader into a file descriptor

I have a Python function (implemented in C++) that reads from a file descriptor (wrapped in FILE* on the C++ side) and I need to feed the function from an asyncio.StreamReader. Specifically, the reader is the content of a HTTP response: aiohttp.ClientResponse.content.

I thought I might open a pipe, pass the read-end to the C++ function and connect the write-end to asyncio's event loop. However, how can I move the data from the stream reader to the pipe, with proper flow control and as little copying as possible?

The skeleton of the code with the missing parts is as following:

# obtain the StreamReader from aiohttp
content = aiohttp_client_response.content
# create a pipe
(pipe_read_fd, pipe_write_fd) = os.pipe()

# now I need a suitable protocol to manage the pipe transport
protocol = ?
(pipe_transport, __) = loop.connect_write_pipe(lambda: protocol, pipe_write_fd)

# the protocol should start reading from `content` and writing into the pipe
return pipe_read_fd

Upvotes: 4

Views: 2810

Answers (2)

Vincent
Vincent

Reputation: 13415

From the subprocess_attach_write_pipe asyncio example:

rfd, wfd = os.pipe()
pipe = open(wfd, 'wb', 0)
transport, _ = await loop.connect_write_pipe(asyncio.Protocol, pipe)
transport.write(b'data')

EDIT - For write flow control, see the following methods:

Here's a possible FlowControl implementation, inspired by StreamWriter.drain:

class FlowControl(asyncio.streams.FlowControlMixin):
    async def drain(self):
        await self._drain_helper()

Usage:

transport, protocol = await loop.connect_write_pipe(FlowControl, pipe)
transport.write(b'data')
await protocol.drain()

Upvotes: 1

Jan Špaček
Jan Špaček

Reputation: 1108

I got around this issue by using a ThreadPoolExecutor and blocking calls to os.write:

(read_fd, write_fd) = os.pipe()
task_1 = loop.create_task(pump_bytes_into_fd(write_fd))
task_2 = loop.run_in_executor(executor_1, parse_bytes_from_fd(read_fd))

async def pump_bytes_into_fd(write_fd):
    while True:
        chunk = await stream.read(CHUNK_SIZE)
        if chunk is None: break
        # process the chunk
        await loop.run_in_executor(executor_2, os.write, write_fd, chunk)

It is crucial that two different executors are used for blocking reads and writes to avoid deadlocks.

Upvotes: 0

Related Questions