jake wong
jake wong

Reputation: 5228

Asyncio Transport and Protocols stream data received into a bytearray for processing

I"m following the example in TCP Client/Server, but i'm having trouble piping the results from data_received into my other functions for processing down the line.

By reproducing their example in my FactoryTCP.py file, and calling loop.create_connection from my main.py file, I am able to establish a connection to the server and send 1 message, and get the reply.

However, this reply, is printed from FactoryTCP.py file, and not streamed / piped into main.py file. Ideally, I would like for the data that is being received, to be streamed / piped into a bytesarray so that I can further manipulate it.

This is an example of what I have:

FactoryTCP.py

class ProtocolFactoryTCP(Transport, Protocol):

    def __init__(self,  on_con_lost, main_buffer):
        super().__init__()
        self.on_con_lost = on_con_lost
        self.transport = None
        self.main_buffer = main_buffer  # I had thought that my initializing this, 
                                        # I can extend into it and receive the file from `main.py`. 
                                        # But this is not the case.

    def connection_made(self, transport):
        self.transport = transport

    def connection_lost(self, exc):
        self.on_con_lost.set_result(True)
        print('The server | client closed the connection')

    def data_received(self, data):
        print(f'Data received: {data}, {self.transport.get_extra_info("peername")}')

    def eof_received(self):
        print("end of file received.")

main.py

async def main():

    main_buffer = bytesarray()
    on_con_lost = event_loop.create_future()
    connection = ProtocolFactoryTCP(on_con_lost, main_buffer)
    transport, protocol = await event_loop.create_connection(lambda: connection, "peer_ip", "peer_port")

    print('reading transport', transport.is_reading())
    print('writing transport', transport.write(message))  # Just a test to see if i can write outside. It works.
    print('writing transport', transport.write(extra_msg))  # Just a test to see if i can write outside. It works.

    try:
        await on_con_lost
        print('awaiting for con lost')
    except Exception as e:
        print(e)


event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(main())

With this current code, I am able to send messages outside of ProtocolFactoryTCP, but how do I get my hands on the data_received outside of ProtocolFactoryTCP as well? This is so that I can create different handlers and functions to deal with the result and respond back to it while the connection is not broken.

Upvotes: 0

Views: 1342

Answers (1)

Roy2012
Roy2012

Reputation: 12523

Shared variable solution

I changed data_received and main and as follows:

def data_received(self, data):
    extra = self.transport.get_extra_info("peername")
    print( f"Data received: {data}, {extra}" )
    self.main_buffer.extend(data) # share the data with the rest of the world

in main, I'm printing the data when I know it's there:

try:
    await on_con_lost
    print('awaiting for con lost')
    print(main_buffer)
except Exception as e:
    print(e)

That's one way to transfer data between the protocol factory and the rest of the application - but there are other, more elegant ways.

One such way is to give the factory a callback to call whenever data comes in. Then, instead of printing the data or appending it to the buffer, data_received will call this callback.

Callback solution.

Here's the code of the callback solution:

class ProtocolFactoryTCP(Transport, Protocol):

    def __init__(self,  on_con_lost, main_buffer, callback):
        super().__init__()
        self.on_con_lost = on_con_lost
        self.transport = None
        self.callback = callback
        self.main_buffer = main_buffer  

    def data_received(self, data):
        extra = self.transport.get_extra_info("peername")
        print( f"Data received: {data}, {extra}" )
        self.main_buffer.extend(data)
        self.callback(self.transport, data) 

...

def my_callback(transport, data):
    print(f"in the callback. data is {data}")
    print('writing transport', transport.write(b"hello, again")) 

... 

async def main():

    ...
    connection = ProtocolFactoryTCP(on_con_lost, main_buffer, my_callback)

To make it work, you'll also have to comment out / remove the two lines in the server, to prevent it from closing the connection immediately:

#        print('Close the client socket')
#        self.transport.close()

An async callback solution

A variant of the above would be to make my_callback async. To demo that, change the following two lines:

async def my_callback(transport, data):
===== Make my_callback async 

In data_received, instead of just calling my_callback, schedule it as a task:

asyncio.create_task(self.callback(self.transport, data))

Upvotes: 1

Related Questions