Reputation: 5228
I"m following the example in TCP Client/Server, but i'm having trouble piping the results from data_received
into my other functions for processing down the line.
By reproducing their example in my FactoryTCP.py
file, and calling loop.create_connection
from my main.py
file, I am able to establish a connection to the server and send 1 message, and get the reply.
However, this reply, is printed from FactoryTCP.py
file, and not streamed / piped into main.py
file. Ideally, I would like for the data that is being received, to be streamed / piped into a bytesarray
so that I can further manipulate it.
This is an example of what I have:
FactoryTCP.py
class ProtocolFactoryTCP(Transport, Protocol):
def __init__(self, on_con_lost, main_buffer):
super().__init__()
self.on_con_lost = on_con_lost
self.transport = None
self.main_buffer = main_buffer # I had thought that my initializing this,
# I can extend into it and receive the file from `main.py`.
# But this is not the case.
def connection_made(self, transport):
self.transport = transport
def connection_lost(self, exc):
self.on_con_lost.set_result(True)
print('The server | client closed the connection')
def data_received(self, data):
print(f'Data received: {data}, {self.transport.get_extra_info("peername")}')
def eof_received(self):
print("end of file received.")
main.py
async def main():
main_buffer = bytesarray()
on_con_lost = event_loop.create_future()
connection = ProtocolFactoryTCP(on_con_lost, main_buffer)
transport, protocol = await event_loop.create_connection(lambda: connection, "peer_ip", "peer_port")
print('reading transport', transport.is_reading())
print('writing transport', transport.write(message)) # Just a test to see if i can write outside. It works.
print('writing transport', transport.write(extra_msg)) # Just a test to see if i can write outside. It works.
try:
await on_con_lost
print('awaiting for con lost')
except Exception as e:
print(e)
event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(main())
With this current code, I am able to send messages outside of ProtocolFactoryTCP
, but how do I get my hands on the data_received
outside of ProtocolFactoryTCP
as well? This is so that I can create different handlers and functions to deal with the result and respond back to it while the connection is not broken.
Upvotes: 0
Views: 1342
Reputation: 12523
I changed data_received
and main
and as follows:
def data_received(self, data):
extra = self.transport.get_extra_info("peername")
print( f"Data received: {data}, {extra}" )
self.main_buffer.extend(data) # share the data with the rest of the world
in main, I'm printing the data when I know it's there:
try:
await on_con_lost
print('awaiting for con lost')
print(main_buffer)
except Exception as e:
print(e)
That's one way to transfer data between the protocol factory and the rest of the application - but there are other, more elegant ways.
One such way is to give the factory a callback to call whenever data comes in. Then, instead of printing the data or appending it to the buffer, data_received
will call this callback.
Here's the code of the callback solution:
class ProtocolFactoryTCP(Transport, Protocol):
def __init__(self, on_con_lost, main_buffer, callback):
super().__init__()
self.on_con_lost = on_con_lost
self.transport = None
self.callback = callback
self.main_buffer = main_buffer
def data_received(self, data):
extra = self.transport.get_extra_info("peername")
print( f"Data received: {data}, {extra}" )
self.main_buffer.extend(data)
self.callback(self.transport, data)
...
def my_callback(transport, data):
print(f"in the callback. data is {data}")
print('writing transport', transport.write(b"hello, again"))
...
async def main():
...
connection = ProtocolFactoryTCP(on_con_lost, main_buffer, my_callback)
To make it work, you'll also have to comment out / remove the two lines in the server, to prevent it from closing the connection immediately:
# print('Close the client socket')
# self.transport.close()
A variant of the above would be to make my_callback
async. To demo that, change the following two lines:
async def my_callback(transport, data):
===== Make my_callback async
In data_received
, instead of just calling my_callback
, schedule it as a task:
asyncio.create_task(self.callback(self.transport, data))
Upvotes: 1