Reputation: 69
When using MS Quic in .NET 8 to stream data from server PC to client PC in local WiFi network, I observed that some of the received streams in client are corrupted, but most arrives correctly. Stream is a byte[] of size of ca. 15 KB, where first 50 byte is timeStamp when stream is written to client, second 50 byte is id of packet (counter) and rest ca. 15.000 bytes are .png converted to byte.
Schema of what happens in the program:
Solution The problem was that when reading the stream there was no check if the expected length was reached. I added the chage to the orginal code. I tested the change and it is working as expected without loosing any packages.
public async Task ReceiveStreamAndSendFeedback()
{
int msgCounter = 0;
int receiveArrayLength = 50 + 50 + 15284;
try
{
if (null == _incomingStream)
{
_incomingStream = await _connection.AcceptInboundStreamAsync();
}
if (null == _outgoingStream)
{
_outgoingStream = await _connection.OpenOutboundStreamAsync(QuicStreamType.Bidirectional);
}
//byte[] buffer = new byte[receiveArrayLength];
bool isRunning = true;
bool isStreamStartAnnounced = false;
while (isRunning)
{
byte[] buffer = new byte[receiveArrayLength];
int streamLength = 0;
while (streamLength < receiveArrayLength)
{
streamLength += await _incomingStream.ReadAsync(buffer, streamLength, receiveArrayLength - streamLength);
}
//Previous way of reading bytes
//streamLength = await _incomingStream.ReadAsync(buffer, 0, receiveArrayLength);
//the same code as previously
My code with server and client:
Server:
Program.cs
QuicServerPrototype quicServerPrototype = new QuicServerPrototype();
await quicServerPrototype.Initi();
List<Task> tasks = new List<Task>();
tasks.Add(quicServerPrototype.SendStream());
tasks.Add(quicServerPrototype.ReceiveFeedback());
// Start all tasks in parallel
await Task.WhenAll(tasks);
QuicServerPrototype.cs
using QuickDemoLib;
using System.Diagnostics;
using System.Net;
using System.Net.Quic;
using System.Net.Security;
using System.Text;
namespace QuicDemoLib;
public class QuicServerPrototype
{
public string IpAdressServer { get; private set; } = "192.168.1.119";
public int PortNumberServer { get; private set; } = 60830;
QuicServerConnectionOptions _serverConnectionOptions;
QuicListener _listener;
QuicConnection _connection;
QuicStream _outgoingStream;
QuicStream _incomingStream;
public double CounterSendDataPices { get; private set; } = 0;
public double AvgDelay_ms { get; private set; } = 0;
string imagePath = "D:\\Gitlab\\PreviewingStreamer\\src\\Previewing Streamer\\QuicDemoLib\\image_1.png";
public async Task Initi()
{
Debug.WriteLine("Starting QUIC server...");
if (!QuicConnection.IsSupported)
{
Debug.WriteLine("QUIC is not supported, check for presence of libmsquic and support of TLS 1.3.");
throw new NotSupportedException("QUIC is not supported, check for presence of libmsquic and support of TLS 1.3.");
}
try
{
_serverConnectionOptions = new QuicServerConnectionOptions
{
// Used to abort stream if it's not properly closed by the user.
// See https://www.rfc-editor.org/rfc/rfc9000#section-20.2
DefaultStreamErrorCode = 0x0A, // Protocol-dependent error code.
// Used to close the connection if it's not done by the user.
// See https://www.rfc-editor.org/rfc/rfc9000#section-20.2
DefaultCloseErrorCode = 0x0B, // Protocol-dependent error code.
MaxInboundBidirectionalStreams = 100,
MaxInboundUnidirectionalStreams = 10,
IdleTimeout = TimeSpan.FromSeconds(30),
// Same options as for server side SslStream.
ServerAuthenticationOptions = new SslServerAuthenticationOptions
{
// Specify the application protocols that the server supports. This list must be a subset of the protocols specified in QuicListenerOptions.ApplicationProtocols.
ApplicationProtocols = [new SslApplicationProtocol("protocol1")],
// Server certificate, it can also be provided via ServerCertificateContext or ServerCertificateSelectionCallback.
ServerCertificate = SelfSignedCertificate.CreateSelfSignedCertificate()
}
};
_listener = await QuicListener.ListenAsync(new QuicListenerOptions
{
// Define the endpoint on which the server will listen for incoming connections. The port number 0 can be replaced with any valid port number as needed.
ListenEndPoint = new IPEndPoint(IPAddress.Parse(IpAdressServer), PortNumberServer), //new IPEndPoint(IPAddress.Loopback, 60834), //
// List of all supported application protocols by this listener.
ApplicationProtocols = [new SslApplicationProtocol("protocol1")],
// Callback to provide options for the incoming connections, it gets called once per each connection.
ConnectionOptionsCallback = (_, _, _) => ValueTask.FromResult(_serverConnectionOptions)
});
Debug.WriteLine($"LocalEndPoint: {_listener.LocalEndPoint}.");
_connection = await _listener.AcceptConnectionAsync();
}
catch (Exception e)
{
throw new InvalidOperationException(e.Message, e);
}
}
public async Task SendStream()
{
int msgCounter = 1;
int sendArrayLength = 50 + 50 + 15284;
try
{
if (null == _outgoingStream)
{
_outgoingStream = await _connection.OpenOutboundStreamAsync(QuicStreamType.Bidirectional);
}
byte[] buffer = new byte[sendArrayLength];
bool isRunning = true;
bool isStreamStartAnnounced = false;
byte[] image = LoadPngToByteArray(imagePath);
while (isRunning)
{
if (!isStreamStartAnnounced)
{
Debug.WriteLine("Stream broadcasting started");
isStreamStartAnnounced = true;
}
string sendTimeStamp = DateTime.UtcNow.ToString("O");
byte[] sendTimeStampAsByte = Encoding.UTF8.GetBytes(sendTimeStamp);
sendTimeStampAsByte.CopyTo(buffer, 0);
byte[] msgFingerPrint = BitConverter.GetBytes(msgCounter);
msgFingerPrint.CopyTo(buffer, 50);
byte[] imageAsByte = image;
imageAsByte.CopyTo(buffer, 100);
await _outgoingStream.WriteAsync(buffer, 0, sendArrayLength);
msgCounter++;
await Task.Delay(50);
}
}
catch (Exception e)
{
throw new InvalidOperationException(e.Message, e);
}
}
public async Task ReceiveFeedback()
{
int msgCounter = 0;
try
{
//await Task.Delay(2000);
if (null == _incomingStream)
{
_incomingStream = await _connection.AcceptInboundStreamAsync();
}
byte[] buffer = new byte[50 + 50];
bool isRunning = true;
bool isStreamStartAnnounced = false;
while (isRunning)
{
if (!isStreamStartAnnounced)
{
Debug.WriteLine("Feedback receiving started");
isStreamStartAnnounced = true;
}
int readLength = await _incomingStream.ReadAsync(buffer, 0, 100);
var receiveTimeStamp = DateTime.UtcNow;
msgCounter++;
CounterSendDataPices++;
byte[] sendTimeStampAsByte = buffer.Take(50).ToArray();
byte[] msgIdByte = buffer.Skip(50).ToArray();
int msgId = BitConverter.ToInt32(msgIdByte, 0);
string sendTimeStampString = System.Text.Encoding.UTF8.GetString(sendTimeStampAsByte);
var isDateParsed = DateTime.TryParse(sendTimeStampString, out DateTime sendTimeStamp);
if (!isDateParsed)
{
Console.WriteLine("Data corrupt. Invalid date format received");
continue;
}
sendTimeStamp = sendTimeStamp.AddHours(-2);
if (receiveTimeStamp < sendTimeStamp)
{
Console.WriteLine("Error in package date.");
}
var delta_time = receiveTimeStamp - sendTimeStamp;
AvgDelay_ms = (AvgDelay_ms * (CounterSendDataPices - 1) + delta_time.TotalMilliseconds) / CounterSendDataPices;
Console.WriteLine($"{msgCounter.ToString("D5")} feedback: avg delay: {AvgDelay_ms.ToString("F3")} ms | lost packages: {msgCounter - msgId} ");
}
}
catch (Exception e)
{
throw new InvalidOperationException(e.Message, e);
}
}
byte[] LoadPngToByteArray(string filePath)
{
if (!File.Exists(filePath))
{
Console.WriteLine($"File not found at {filePath}");
throw new FileNotFoundException(filePath);
}
var file = File.ReadAllBytes(filePath);
return file;
}
}
Client:
Program.cs
QuicClientPrototype quickClientPrototype = new QuicClientPrototype();
try
{
await Task.Delay(4000);
await quickClientPrototype.Initi();
await quickClientPrototype.ReceiveStreamAndSendFeedback();
}
catch (Exception e)
{
throw new Exception(e.Message,e);
}
QuicClientPrototype.cs
using System.Diagnostics;
using System.Net;
using System.Net.Quic;
using System.Net.Security;
namespace QuictDemoLib;
public class QuicClientPrototype
{
public string IpAdressServer { get; private set; } = "192.168.1.119";
public int PortNumberServer { get; private set; } = 60830;
QuicClientConnectionOptions _clientConnectionOptions;
QuicConnection _connection;
QuicStream _outgoingStream;
QuicStream _incomingStream;
public delegate void ByteArraySentHandler(byte[] data);
public event ByteArraySentHandler? ByteArraySent;
public async Task Initi()
{
Debug.WriteLine("Starting QUIC client...");
if (!QuicConnection.IsSupported)
{
Debug.WriteLine("QUIC is not supported, check for presence of libmsquic and support of TLS 1.3.");
throw new NotSupportedException("QUIC is not supported, check for presence of libmsquic and support of TLS 1.3.");
}
try
{
_clientConnectionOptions = new QuicClientConnectionOptions
{
// End point of the server to connect to.
RemoteEndPoint = new IPEndPoint(IPAddress.Parse(IpAdressServer), PortNumberServer),
// Used to abort stream if it's not properly closed by the user.
// See https://www.rfc-editor.org/rfc/rfc9000#section-20.2
DefaultStreamErrorCode = 0x0A, // Protocol-dependent error code.
// Used to close the connection if it's not done by the user.
// See https://www.rfc-editor.org/rfc/rfc9000#section-20.2
DefaultCloseErrorCode = 0x0B, // Protocol-dependent error code.
// Optionally set limits for inbound streams.
MaxInboundUnidirectionalStreams = 10,
MaxInboundBidirectionalStreams = 100,
// Same options as for client side SslStream.
ClientAuthenticationOptions = new SslClientAuthenticationOptions
{
// List of supported application protocols.
ApplicationProtocols = [new SslApplicationProtocol("protocol1")],
// The name of the server the client is trying to connect to. Used for server certificate validation.
TargetHost = "localhost",
RemoteCertificateValidationCallback = (sender, certificate, chain, sslPolicyErrors) => true
}
};
_connection = await QuicConnection.ConnectAsync(_clientConnectionOptions);
Debug.WriteLine($"Connected {_connection.LocalEndPoint} --> {_connection.RemoteEndPoint}");
}
catch (Exception e)
{
if (null != _connection)
{
await _connection.CloseAsync(0x0C);
// Dispose the connection.
await _connection.DisposeAsync();
}
throw new InvalidOperationException(e.Message, e);
}
}
public async Task ReceiveStreamAndSendFeedback()
{
int msgCounter = 0;
int receiveArrayLength = 50 + 50 + 15284;
try
{
if (null == _incomingStream)
{
_incomingStream = await _connection.AcceptInboundStreamAsync();
}
if (null == _outgoingStream)
{
_outgoingStream = await _connection.OpenOutboundStreamAsync(QuicStreamType.Bidirectional);
}
byte[] buffer = new byte[receiveArrayLength];
bool isRunning = true;
bool isStreamStartAnnounced = false;
while (isRunning)
{
var streamLength = await _incomingStream.ReadAsync(buffer, 0, receiveArrayLength);
byte[] metaData = buffer.Take(100).ToArray();
if (metaData.All(b => b == 0))
{
continue;
}
byte[] msgIdByte = metaData.Skip(50).ToArray();
byte[] orgImage = LoadPngToByteArray("D:\\Gitlab\\PreviewingStreamer\\src\\Previewing Streamer\\QuicClientDemo\\image_1.png");
byte[] data = buffer.Skip(100).ToArray();
bool areEqual = orgImage.SequenceEqual(data);
if (!areEqual)
{
Console.WriteLine("Received corrupted data.");
}
if (streamLength == 0)
{
continue;
}
msgCounter++;
if (!isStreamStartAnnounced)
{
Debug.WriteLine("Stream receiving started");
isStreamStartAnnounced = true;
}
int msgId = BitConverter.ToInt32(msgIdByte, 0);
Stopwatch sw = new Stopwatch();
sw.Start();
ByteArraySent?.Invoke(data);
sw.Stop();
var swElapsed = sw.ElapsedMilliseconds;
//if (msgCounter % 10 == 0)
//{
Console.WriteLine($"{msgCounter.ToString("D5")} Client received id: {msgId} stream at {DateTime.UtcNow.ToString("HH:mm:ss.fff")}");
//}
//if (metaData.Any(b => b != 0))
//{
await _outgoingStream.WriteAsync(metaData, 0, 100);
//}
}
}
catch (Exception e)
{
throw new InvalidOperationException(e.Message, e);
}
finally
{
// Close the connection with the custom code.
await _connection.CloseAsync(0x0C);
// Dispose the connection.
await _connection.DisposeAsync();
}
}
public async Task SendStream(Stream stream)
{
try
{
if (null == _outgoingStream)
{
_outgoingStream = await _connection.OpenOutboundStreamAsync(QuicStreamType.Bidirectional);
}
}
catch (Exception e)
{
throw new InvalidOperationException(e.Message, e);
}
finally
{
// Close the connection with the custom code.
await _connection.CloseAsync(0x0C);
// Dispose the connection.
await _connection.DisposeAsync();
}
}
byte[] LoadPngToByteArray(string filePath)
{
if (!File.Exists(filePath))
{
Console.WriteLine($"File not found at {filePath}");
throw new FileNotFoundException(filePath);
}
var file = File.ReadAllBytes(filePath);
return file;
}
}
In client I compare received .png with the orginal one, that way I am sure, that there are streams, which are delivered correctly. I tried different streaming frequnecies only one stream every 2 seconds (of size 15 KB) to be sure that the bandwidth is not the problem. Both laptops used in tests are new and WiFi routers are fast HP routers made for small buisness, so I assume hardware itselfs is not a problem.
Inspecting the a few streams, I found out that mostly only ca. 5 % of bytes have corrupted data. However only ca. 60 % of written streams are received correctly by client. Table is a part of preview of the correct send stream and corrupt received stream, maybe it helps somehow.
Correct | Corrupt |
---|---|
0 | 0 |
2 | 0 |
68 | 73 |
25 | 69 |
0 | 78 |
0 | 68 |
0 | 174 |
128 | 66 |
0 | 96 |
81 | 130 |
6 | 6 |
0 | 0 |
32 | 32 |
64 | 64 |
148 | 148 |
1 | 1 |
0 | 0 |
8 | 8 |
16 | 16 |
101 | 101 |
I have no clue why part of the bytes is sometimes altered and what am I doing wrong. I would also appreciate if anybody knows some more resources on quic in .net aprat from already linked article.
Thank you!
Upvotes: 1
Views: 65