user10335115
user10335115

Reputation: 3

Asyncio Fatal error: protocol.data_received() call failed

When client send big message ~5300 bytes asyncio gets 2917-2923 bytes then raised this error.Python 3.7. Unreceived message ends up so

{"item":4,"active":false,"num":2,"turn_touch":0,"damaged_in_turn":0},{"item":7,"active":fal

And of course json could not parse this message

Error log

ERROR:asyncio:Fatal error: protocol.data_received() call failed.
protocol: <__main__.Server object at 0xb5a2e0ac>
transport: <_SelectorSocketTransport fd=9 read=polling write=<idle, bufsize=0>>
Traceback (most recent call last):
  File "/usr/lib/python3.7/asyncio/selector_events.py", line 824, in _read_ready__data_received
    self._protocol.data_received(data)
  File "/home/den/piratebay/server.py", line 41, in data_received
    self.message_handle.check_message_id(self, data)
  File "/home/den/piratebay/message_handle.py", line 25, in check_message_id
    self.parsed_data = parse_data(data)
  File "/home/den/piratebay/action.py", line 50, in parse_data
    return json.loads(data)
  File "/usr/lib/python3.7/json/__init__.py", line 348, in loads
    return _default_decoder.decode(s)
  File "/usr/lib/python3.7/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/usr/lib/python3.7/json/decoder.py", line 355, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 2918 (char 2917)

server.py

class Server(asyncio.Protocol):
    def connection_made(self, transport):
        logging.debug('connection_made')
        """ Called on instantiation, when new client connects """
        self.transport = transport
        self.addr = transport.get_extra_info('peername')
        s = transport.get_extra_info("socket")
        print('Connection from {}'.format(self.addr))


    def data_received(self, data):
        """ Handle data as it's received.  """
        logging.debug('received {} bytes'.format(len(data)))
        data = data.decode()
        print('received data->',data)
        self.message_handle.check_message_id(self, data)


    def connection_lost(self, ex):
        """ Called on client disconnect. Clean up client state """       
        self.transport.close()


if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    # Create server and initialize on the event loop
    coroutine = loop.create_server(Server,
                                   host=HOST,
                                   port=PORT)

What is cause this error?Thanks in advance

Upvotes: 0

Views: 1896

Answers (1)

user4815162342
user4815162342

Reputation: 155610

There is no guarantee that a single data_received will receive the whole application-level message. TCP is a stream-based protocol that doesn't even provide the concept of a message, just a stream of bytes. A single write on the client side can be split into multiple packets and arrive in multiple reads by the server. Conversely, multiple writes may be coalesced into a single packet and received as a single unit.

A correctly written asyncio program cannot assume that all data will arrive in a single data_received call. Instead, data_received must collect the incoming data, recognize when the data is complete, and only then process the data. How the end of message is recognized will depend on the protocol used - for example, http provides the option to declare the content length up-front. If the client is expected to disconnect after sending the message, the end of message will be recognized by encountering the end-of-file condition.

In the latter case, data_received would collect the bytes, which would be processed by eof_received:

import asyncio, io

class Server(asyncio.Protocol):
    def connection_made(self, transport):
        self.transport = transport
        addr = transport.get_extra_info('peername')
        print('Connection from {}'.format(addr))
        self._data = io.BytesIO()

    def data_received(self, data):
        print('received data->',data)
        self._data.write(data)

    def eof_received(self):
        print('received EOF')
        self._data.write(data)
        data = self._data.getvalue()
        data = data.decode()
        self.message_handle.check_message_id(self, data)

    def connection_lost(self, ex):
        self.transport.close()

Note that code like the above is much more straightforward to write using the stream-based API, which is the recommended way to write modern asyncio code:

import asyncio

async def handle_client(r, w):
    data = await r.read()  # read until EOF
    data = data.decode()
    message_handle.check_message_id(data)
    # ...

loop = asyncio.get_event_loop()
message_handle = ...
server = loop.run_until_complete(
    asyncio.start_server(
        lambda r, w: handle_client(r, w, message_handle),
        '127.0.0.1', 8888))
loop.run_forever()

Upvotes: 1

Related Questions