Reputation: 2423
I have a piece of code which most of the time skips packets when many packets arrive at almost the same time from the same source. The packets are built so that, a packet size field is attached at the beginning which has the size of the packet after it, in bytes.
The TCP client thread is ran with 10ms interval.
unit AgThread11;
interface
uses
SysUtils, Classes, Windows, Rtti;
type
TAgThreadMethod1 = procedure of object;
TAgThreadMethod2 = procedure;
TAgThread = class ( TThread )
private
fInterval : Cardinal;
fTerminateEvent : THandle;
fRun : Boolean;
fOnRun1 : TAgThreadMethod1;
fOnRun2 : TAgThreadMethod2;
protected
procedure Execute; override;
public
constructor Create
( const AOnRun: TAgThreadMethod1; const AInterval: Cardinal;
const ARun: Boolean = True ); overload;
constructor Create
( const AOnRun: TAgThreadMethod2; const AInterval: Cardinal;
const ARun: Boolean = True ); overload;
destructor Destroy; override;
procedure Signal;
property Run : Boolean read fRun write fRun;
property Interval : Cardinal read fInterval write fInterval;
property OnRun1 : TAgThreadMethod1 read fOnRun1 write fOnRun1;
property OnRun2 : TAgThreadMethod2 read fOnRun2 write fOnRun2;
end;
implementation
constructor TAgThread.Create
( const AOnRun: TAgThreadMethod1; const AInterval: Cardinal;
const ARun: Boolean = True );
begin
fTerminateEvent := CreateEvent ( nil, TRUE, FALSE, nil );
fInterval := AInterval;
fRun := ARun;
fOnRun1 := AOnRun;
inherited Create ( False );
end;
constructor TAgThread.Create
( const AOnRun: TAgThreadMethod2; const AInterval: Cardinal;
const ARun: Boolean = True );
begin
fTerminateEvent := CreateEvent ( nil, TRUE, FALSE, nil );
fInterval := AInterval;
fRun := ARun;
fOnRun2 := AOnRun;
inherited Create ( False );
end;
destructor TAgThread.Destroy;
begin
Terminate;
Signal;
WaitFor;
inherited;
end;
procedure TAgThread.Signal;
begin
SetEvent ( FTerminateEvent );
end;
procedure TAgThread.Execute;
begin
while not Terminated do
begin
if fRun then
if Assigned ( fOnRun1 ) then fOnRun1
else if Assigned ( fOnRun2 ) then fOnRun2;
WaitForSingleObject ( FTerminateEvent, fInterval );
end;
end;
end.
procedure TForm1.THEX_TCP;
var
Buffer : TBytes;
MsgSize : Integer;
begin
if TCPClient.IOHandler.CheckForDataOnSource then
begin
while TCPClient.IOHandler.InputBuffer.Size >= 4 do
begin
fRXCount := fRXCount + 1;
TCPClient.IOHandler.InputBuffer.ExtractToBytes ( Buffer, 4 );
Move ( Buffer [0], MsgSize, 4 );
TCPClient.IOHandler.InputBuffer.ExtractToBytes ( Buffer, MsgSize, False );
NAT.RecievedNATData ( Buffer ); // Packet Processor
end;
end;
end;
What should I do to ensure zero packet loss?
Upvotes: 0
Views: 766
Reputation: 598001
There are two major problems with your TCP reading code:
You are not ensuring that the InputBuffer
actually has MsgSize
number of bytes available before calling ExtractToBytes()
the second time. If you try to extract more bytes than are actually in the buffer, ExtractToBytes()
raises an exception.
more importantly, you are not resizing your Buffer
variable back to 0 before each call to ExtractToBytes()
. After the first call in the first loop iteration, the length of the Buffer
is 4 bytes. If that message is less than 4 bytes in size, you are leaving behind random bytes at the end of your Buffer
that are being passed on to your parser and likely corrupts its logic. But worse, if there is another message size in the buffer, your next loop iteration makes a 3rd call to ExtractToBytes()
and appends those 4 bytes to the end of the existing Buffer
content, not replaces the content like you are assuming (the AAppend
parameter of ExtractToBytes()
is True by default). Thus, you end up copying 4 bytes from the previous message data into your MsgSize
variable instead of the new 4 bytes you just extracted, so you are using a corrupted MsgSize
value on the next ExtractToBytes()
call.
Because your packets are length-prefixed, you do not need to use CheckForDataOnSource()
or access the InputBuffer
directly at all. Use the following code and let Indy do the work for you:
procedure TForm1.THEX_TCP;
var
Buffer : TBytes;
MsgSize : Integer;
begin
MsgSize := TCPClient.IOHandler.ReadLongInt;
TCPClient.IOHandler.ReadBytes(Buffer, MsgSize);
Inc(fRXCount);
NAT.RecievedNATData(Buffer);
end;
By default, that will block the caller until data is available to read. If THEX_TCP
needs to exit when there is no data ready to be read, use something like this instead:
procedure TForm1.THEX_TCP;
var
Buffer : TBytes;
MsgSize : Integer;
begin
if TCPClient.IOHandler.InputBufferIsEmpty then
begin
TCPClient.IOHandler.CheckForDataOnSource;
TCPClient.IOHandler.CheckForDisconnect;
if TCPClient.IOHandler.InputBufferIsEmpty then Exit;
end;
repeat
MsgSize := TCPClient.IOHandler.ReadLongInt;
TCPClient.IOHandler.ReadBytes(Buffer, MsgSize);
Inc(fRXCount);
NAT.RecievedNATData(Buffer);
SetLength(Buffer, 0);
until TCPClient.IOHandler.InputBufferIsEmpty;
end;
The only gotcha with this approach is that ReadLongInt()
and ReadBytes()
might read more bytes into the InputBuffer
, so your loop could potentially running for a long time if a lot of data is being sent in a short amount of time. If you absolutely must read only one buffer at a time and only process complete messages then use something like this:
procedure TForm1.THEX_TCP;
var
MsgSizeBuffer: array[0..3] of Byte;
MsgSize, I : Integer;
Buffer : TBytes;
begin
TCPClient.IOHandler.CheckForDataOnSource;
TCPClient.IOHandler.CheckForDisconnect;
while TCPClient.IOHandler.InputBuffer.Size >= 4 do
begin
// unfortunately, TIdBuffer does not have a way to peek
// multiple bytes at a time without removing them
for I := 0 to 3 do
MsgSizeBuffer[I] := TCPClient.IOHandler.InputBuffer.Peek(I);
Move(MsgSizeBuffer[0], MsgSize, 4);
MsgSize := LongInt(GStack.NetworkToHost(LongWord(MsgSize)));
if TCPClient.IOHandler.InputBuffer.Size < (4+MsgSize) then
Break;
TCPClient.IOHandler.InputBuffer.Remove(4);
TCPClient.IOHandler.InputBuffer.ExtractToBytes(Buffer, MsgSize);
Inc(fRXCount);
NAT.RecievedNATData(Buffer);
SetLength(Buffer, 0);
end;
end;
Upvotes: 2