Reputation: 1108
I have a Python function (implemented in C++) that reads from a file descriptor (wrapped in FILE*
on the C++ side) and I need to feed the function from an asyncio.StreamReader
. Specifically, the reader is the content of a HTTP response: aiohttp.ClientResponse.content.
I thought I might open a pipe, pass the read-end to the C++ function and connect the write-end to asyncio
's event loop. However, how can I move the data from the stream reader to the pipe, with proper flow control and as little copying as possible?
The skeleton of the code with the missing parts is as following:
# obtain the StreamReader from aiohttp
content = aiohttp_client_response.content
# create a pipe
(pipe_read_fd, pipe_write_fd) = os.pipe()
# now I need a suitable protocol to manage the pipe transport
protocol = ?
(pipe_transport, __) = loop.connect_write_pipe(lambda: protocol, pipe_write_fd)
# the protocol should start reading from `content` and writing into the pipe
return pipe_read_fd
Upvotes: 4
Views: 2810
Reputation: 13415
From the subprocess_attach_write_pipe asyncio example:
rfd, wfd = os.pipe()
pipe = open(wfd, 'wb', 0)
transport, _ = await loop.connect_write_pipe(asyncio.Protocol, pipe)
transport.write(b'data')
EDIT - For write flow control, see the following methods:
Here's a possible FlowControl
implementation, inspired by StreamWriter.drain:
class FlowControl(asyncio.streams.FlowControlMixin):
async def drain(self):
await self._drain_helper()
Usage:
transport, protocol = await loop.connect_write_pipe(FlowControl, pipe)
transport.write(b'data')
await protocol.drain()
Upvotes: 1
Reputation: 1108
I got around this issue by using a ThreadPoolExecutor
and blocking calls to os.write
:
(read_fd, write_fd) = os.pipe()
task_1 = loop.create_task(pump_bytes_into_fd(write_fd))
task_2 = loop.run_in_executor(executor_1, parse_bytes_from_fd(read_fd))
async def pump_bytes_into_fd(write_fd):
while True:
chunk = await stream.read(CHUNK_SIZE)
if chunk is None: break
# process the chunk
await loop.run_in_executor(executor_2, os.write, write_fd, chunk)
It is crucial that two different executors are used for blocking reads and writes to avoid deadlocks.
Upvotes: 0