Aviran
Aviran

Reputation: 5458

Controlling the feed of incoming bytes using twisted

I need to address the following issue

As a client I connect to a server, the server sends blocks of data in the following form:

[4 bytes][msg - block of bytes the size of int(previous 4 bytes)]

When using twisted I need to make dataReceived(self, data) to be called with the msg part, I don't mind receiving the the 4 bytes prefix, but I have I need to make sure I get the entire message block in one piece, not fragmented, one at a time.

Please advise.

Upvotes: 0

Views: 140

Answers (2)

Aviran
Aviran

Reputation: 5458

I've ended up writing the following Custom Receiver

HEADER_LENGTH = 4

class CustomReceiver(Protocol):
    _buffer = b''

def dataReceived(self, data):
    logger.info(f'DATA RECEIVED: {data}')

    data = (self._buffer + data)
    header = data[:HEADER_LENGTH]
    logger.info(f'header: {header}   len: {len(header)}')
    while len(header) == HEADER_LENGTH:
        response_length = int.from_bytes(header, byteorder='big')
        response = data[HEADER_LENGTH:][:response_length]
        self.responseReceived(response)
        data = data[HEADER_LENGTH + response_length:]
        header = data[:HEADER_LENGTH]

    self._buffer = header

I'm not sure if I should add a locking mechanism for dataReceived(), simultaneous invocations will corrupt the _buffer data.

Upvotes: 0

Jean-Paul Calderone
Jean-Paul Calderone

Reputation: 48315

StatefulProtocol is helpful for protocols like this.

from twisted.protocols.stateful import StatefulProtocol

HEADER_LENGTH = 4

class YourProtocol(StatefulProtocol):

    # Define the first handler and what data it expects.
    def getInitialState(self):
        return (
            # The first handler is self._header
            self._header, 
            # And it expects HEADER_LENGTH (4) bytes
            HEADER_LENGTH,
        )

    # When HEADER_LENGTH bytes have been received, this is called.
    def _header(self, data):
        # It returns a tuple representing the next state handler.
        return (
            # The next thing we can handle is a response
            self._response, 
            # And the response is made up of this many bytes.
            int.from_bytes(header, byteorder='big'),
        )

    # When the number of bytes from the header has been received,
    # this is called.
    def _response(self, data):
        # Application dispatch of the data
        self.responseReceived(data)
        # Return to the initial state to process the next received data.
        return self.getInitialState()

Upvotes: 1

Related Questions