Reputation: 4021
I'm looking for some 'beyond basic' guidance on usage patterns for the StreamReader and StreamWriter classes in the Python asyncio package.
I am attempting to build a stateful server with a custom protocol using protobuf. Should I be sub-classing the StreamReader and StreamWriter to manage the serialization from protobuf bytes? I could then provide a read_message function on the reader. I know I can copy the code from streams.start_server providing my own StreamReader, but how do I set my StreamWriter?
Any pointers or examples gratefully received.
Upvotes: 4
Views: 2277
Reputation: 4021
I found it relatively straightforward to subclass the asyncio.streams library classes.
The start_server function is lifted from the tcp server example:
@asyncio.coroutine
def start_server(self, loop):
def factory():
reader = QbpStreamReader()
return QbpStreamReaderProtocol(reader, self._accept_client)
logger.info("QbpServer starting at tcp://%s:%s", self.host, self.port)
self.server = yield from loop.create_server(factory, self.host, self.port)
It was necessary to subclass StreamReaderProtocol in order to construct my own StreamWriter. Other than that this is the same as the library function.
class QbpStreamReaderProtocol(streams.StreamReaderProtocol):
def connection_made(self, transport):
self._stream_reader.set_transport(transport)
if self._client_connected_cb is not None:
self._stream_writer = QbpStreamWriter(transport, self,
self._stream_reader,
self._loop)
res = self._client_connected_cb(self._stream_reader,
self._stream_writer)
if coroutines.iscoroutine(res):
self._loop.create_task(res)
For outgoing messages:
class QbpStreamWriter(streams.StreamWriter):
def write_msg(self, msg):
# data = serialise message
self.write(data)
And for incoming messages:
class QbpStreamReader(streams.StreamReader):
@asyncio.coroutine
def read_msg(self):
data = yield from self.readexactly(header_length)
# msg_type, msg_length = unpack header
data = yield from self.readexactly(msg_length)
return build_message(msg_type, data)
Hope it helps someone
Upvotes: 3
Reputation: 17366
I suggest instead of deriving from StreamReader/StreamWriter invite your own class(es) with similar API.
Say, I did it for aiozmq
library: https://github.com/aio-libs/aiozmq/blob/master/aiozmq/stream.py
Upvotes: 3