Reputation: 508
When using asyncio for receiving multiple files over a TCP socket I struggle with the call order of received_data. When sending 3 data streams at once my output is the following:
DEBUG connection_made ('127.0.0.1', 33972) new connection was made
DEBUG connection_made ('127.0.0.1', 33974) new connection was made
DEBUG connection_made ('127.0.0.1', 33976) new connection was made
DEBUG data_received ('127.0.0.1', 33976) data received from new connection
DEBUG data_received ('127.0.0.1', 33974) data received from new connection
DEBUG data_received ('127.0.0.1', 33972) data received from new connection
I assume that its behaviour is analog to a stack where the data is received from the newest to the oldest connection made, but this is only a guess.
Is it possible to change that behaviour in a way that the data is received in the order that the connections were made? This is important because I need the data received from the first connection for further processing the following connections.
My code is the following:
import asyncio
class AIO(asyncio.Protocol):
def __init__(self):
self.extra = bytearray()
def connection_made(self, transport):
global request_time
peer = transport.get_extra_info('peername')
logger.debug('{} new connection was made'.format(peer))
self.transport = transport
request_time = str(time.time())
def data_received(self, request, msgid=1):
peer = self.transport.get_extra_info('peername')
logger.debug('{} data received from new connection'.format(peer))
self.extra.extend(request)
Upvotes: 0
Views: 320
Reputation: 155216
First, I would recommend using the higher-level streams API instead of the transport/protocols. Then if you need to maintain the order observed when connections were made, you can enforce it yourself using a series of asyncio.Event
s. For example:
import asyncio, itertools
class Comm:
def __init__(self):
self.extra = bytearray()
self.preceding_evt = None
async def serve(self, r, w):
preceding_evt = self.preceding_evt
self.preceding_evt = this_evt = asyncio.Event()
while True:
request = await r.read(4096)
# wait for the preceding connection to receive and store data
# before we store ours
if preceding_evt is not None:
await preceding_evt.wait()
preceding_evt.clear()
# self.extra now contains data from previous connections
self.extra.extend(request)
# inform the subsequent connection that we got data
this_evt.set()
w.close()
async def main():
comm = Comm()
server = await asyncio.start_server(comm.serve, '127.0.0.1', 8888)
async with server:
await server.serve_forever()
asyncio.run(main())
Upvotes: 1