Iman Navidi
Iman Navidi

Reputation: 33

Handling receiving multiple messages / multiple chunks when using websockets in .NET

I'm implementing a websocket server and using binary data (not text) to communicate between server and clients. When waiting for a message to receive and then process should I care about a message received in multiple chunks and also two messages receives at the same time when I use following code?

var buffer = new byte[256];
var result = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);

*I'm sure all messages sent by (valid) clients/server are fitted within 256 bytes long buffer and EndOfMessage flag is set to true when using webSocket.SendAsync();

If yes, I would be grateful to see a best practice code that handles receiving messages properly when there is a "await Process(byte[] data)" method that handles only one message at a time.

I used following code to handle messages with multiple chunks but not sure whether it is overengineering or not.

while (socket.State == WebSocketState.Open)
{
    var buffer = new byte[256];
    await using var dataStream = new MemoryStream();
    WebSocketReceiveResult result;
    do
    {
        result = await socket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
        if (result.MessageType == WebSocketMessageType.Close)
        {
            await socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);
            return;
        }
        await dataStream.WriteAsync(buffer);
    } while (!result.EndOfMessage);
    await Process(dataStream.ToArray()); //Process functions do not detect multiple messages, and just can process one message at a time
    dataStream.SetLength(0); //Clear memory stream data
}

Upvotes: 0

Views: 983

Answers (1)

Ryan
Ryan

Reputation: 746

I would be grateful to see a best practice code that handles receiving messages properly

Not sure about best practice, but regarding comments above i would make some refactoring:

var buffer = new Memory<byte>(new byte[256]);    

while (socket.State == WebSocketState.Open)
{
    ValueWebSocketReceiveResult result;
    do
    {
        result = await socket.ReceiveAsync(buffer, CancellationToken.None);
        if (result.MessageType == WebSocketMessageType.Close)
        {
            await socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);
            return;
        }            
    } 
    while (!result.EndOfMessage);        
    //Process functions do not detect multiple messages, and just can process one message at a time
    await Process(buffer[..result.Count]); // range op for Memory<byte> is quite cheap         
}

So, changes made:

  1. byte[] replaced with Memory<byte>() for reason Range operator usages (buffer[..result.Count]) will not cause memory allocations on that type.

  2. Other overload of socket.ReceiveAsync() is used returning ValueTask<WebSocketReceiveResult> (not Task<>), so, 8 bytes economy per call for memory allocations, afaik.

  3. If messages are not exactly of 256 size, but falls within that range, we must consider result.Count

  4. Interesting part: as data is handled sequentially we can use Memory further, and no need in MemoryStream at all.

    4.2. If we wanna get more bandwidth for this piece of logic, we can't get rid of MemoryStream and have to use extra allocation to copy data for Process(..) method call.
    Code would be a bit different:

    var buffer = new Memory<byte>(new byte[256]);
    await using var dataStream = new MemoryStream();
    
    while (socket.State == WebSocketState.Open)
    {
        ValueWebSocketReceiveResult result;
        do
        {
            result = await socket.ReceiveAsync(buffer, CancellationToken.None);
            if (result.MessageType == WebSocketMessageType.Close)
            {
                await socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);
                return;
            }
            await dataStream.WriteAsync(buffer[..result.Count]);
        } while (!result.EndOfMessage);
        //Process asynchronously, so we don't wait 
        //but have to handle all of the branches there: normal/error/cancelled
        // have to make extra allocation for data copy
        _ = Process(dataStream.ToArray());
        dataStream.SetLength(0); //Clear memory stream data
    }
    

PS: If you know your message sizes (in this simple case - your protocol), it is possible to get rid of while (!result.EndOfMessage); instruction. It would be quite unreliable, but just to point out for the best bandwidth case.

Upvotes: 2

Related Questions