Reputation: 5458
I need to address the following issue
As a client I connect to a server, the server sends blocks of data in the following form:
[4 bytes][msg - block of bytes the size of int(previous 4 bytes)]
When using twisted
I need to make dataReceived(self, data)
to be called with the msg
part, I don't mind receiving the the 4 bytes prefix, but I have I need to make sure I get the entire message block in one piece, not fragmented, one at a time.
Please advise.
Upvotes: 0
Views: 140
Reputation: 5458
I've ended up writing the following Custom Receiver
HEADER_LENGTH = 4
class CustomReceiver(Protocol):
_buffer = b''
def dataReceived(self, data):
logger.info(f'DATA RECEIVED: {data}')
data = (self._buffer + data)
header = data[:HEADER_LENGTH]
logger.info(f'header: {header} len: {len(header)}')
while len(header) == HEADER_LENGTH:
response_length = int.from_bytes(header, byteorder='big')
response = data[HEADER_LENGTH:][:response_length]
self.responseReceived(response)
data = data[HEADER_LENGTH + response_length:]
header = data[:HEADER_LENGTH]
self._buffer = header
I'm not sure if I should add a locking mechanism for dataReceived()
, simultaneous invocations will corrupt the _buffer
data.
Upvotes: 0
Reputation: 48315
StatefulProtocol
is helpful for protocols like this.
from twisted.protocols.stateful import StatefulProtocol
HEADER_LENGTH = 4
class YourProtocol(StatefulProtocol):
# Define the first handler and what data it expects.
def getInitialState(self):
return (
# The first handler is self._header
self._header,
# And it expects HEADER_LENGTH (4) bytes
HEADER_LENGTH,
)
# When HEADER_LENGTH bytes have been received, this is called.
def _header(self, data):
# It returns a tuple representing the next state handler.
return (
# The next thing we can handle is a response
self._response,
# And the response is made up of this many bytes.
int.from_bytes(header, byteorder='big'),
)
# When the number of bytes from the header has been received,
# this is called.
def _response(self, data):
# Application dispatch of the data
self.responseReceived(data)
# Return to the initial state to process the next received data.
return self.getInitialState()
Upvotes: 1