Reputation: 2843
I am testing a simple TCP server in C# that I aim to be able to connect 10k clients concurrently:
Server
public class HighPerfTcpServer
{
private Socket? _listeningSocket;
private ILogger<HighPerfTcpServer> _logger;
public HighPerfTcpServer(ILogger<HighPerfTcpServer> logger)
{
_logger = logger;
}
public async Task<int> Start(TcpServerOptions options, ITcpServerSessionsHandler sessionHandler)
{
var address = options.Address ?? "127.0.0.1";
var port = options.Port ?? 0;
var ipAddress = IPAddress.Parse(address);
var localEndPoint = new IPEndPoint(ipAddress, port);
_listeningSocket = new Socket(localEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
_listeningSocket.Bind(localEndPoint);
_listeningSocket.Listen(int.MaxValue);
var acceptArgs = new SocketAsyncEventArgs();
acceptArgs.Completed += AcceptArgs_Completed;
StartAccept(acceptArgs);
var currentLocalEndPoint = _listeningSocket.LocalEndPoint as IPEndPoint;
ArgumentNullException.ThrowIfNull(currentLocalEndPoint);
return currentLocalEndPoint.Port;
}
private void StartAccept(SocketAsyncEventArgs acceptArgs)
{
if (_listeningSocket == null)
{
return;
}
var willRaiseEvent = false;
while (!willRaiseEvent)
{
acceptArgs.AcceptSocket = null;
try
{
willRaiseEvent = _listeningSocket.AcceptAsync(acceptArgs);
if (!willRaiseEvent)
{
ProcessAccept(acceptArgs);
}
}
catch
{
break;
}
}
}
private void ProcessAccept(SocketAsyncEventArgs acceptArgs)
{
_logger.LogDebug($"client connected");
}
private void AcceptArgs_Completed(object? sender, SocketAsyncEventArgs e)
{
if (e.LastOperation == SocketAsyncOperation.Accept)
{
ProcessAccept(e);
StartAccept(e);
}
}
public async Task Stop()
{
_listeningSocket?.Dispose();
}
}
Client
public class HighPerfTcpClient
{
private TaskCompletionSource? _connectAwaiter;
private TaskCompletionSource? _disconnectAwaiter;
private Socket? _clientSocket;
public async Task<ITcpClientConnection> Connect(TcpClientOptions options)
{
ArgumentNullException.ThrowIfNull(options.Address);
ArgumentNullException.ThrowIfNull(options.Port);
var ipAddress = IPAddress.Parse(options.Address);
var remoteEndPoint = new IPEndPoint(ipAddress, options.Port.Value);
_clientSocket = new Socket(SocketType.Stream, ProtocolType.Tcp);
var connectEventArgs = new SocketAsyncEventArgs();
connectEventArgs.Completed += ConnectEventArgs_Completed;
connectEventArgs.RemoteEndPoint = remoteEndPoint;
_connectAwaiter = new TaskCompletionSource();
var willRaiseEvent = _clientSocket.ConnectAsync(connectEventArgs);
if (willRaiseEvent)
{
await _connectAwaiter.Task.ConfigureAwait(false);
}
if (connectEventArgs.SocketError != SocketError.Success)
{
throw new Exception($"socket connect error: {connectEventArgs.SocketError}");
}
return new TcpClientConnection(DataSender);
}
private async Task DataSender(ArraySegment<byte> segment)
{
}
private void ConnectEventArgs_Completed(object? sender, SocketAsyncEventArgs e)
{
if (e.LastOperation == SocketAsyncOperation.Connect)
{
_connectAwaiter?.SetResult();
}
}
public async Task Disconnect()
{
if (_clientSocket != null)
{
var disconnectArgs = new SocketAsyncEventArgs();
disconnectArgs.Completed += DisconnectArgs_Completed;
_disconnectAwaiter = new TaskCompletionSource();
var willRaiseEvent = _clientSocket.DisconnectAsync(disconnectArgs);
if (willRaiseEvent)
{
await _disconnectAwaiter.Task.ConfigureAwait(false);
}
}
}
private void DisconnectArgs_Completed(object? sender, SocketAsyncEventArgs e)
{
if (e.LastOperation == SocketAsyncOperation.Disconnect)
{
_disconnectAwaiter?.SetResult();
}
}
}
Test Case
public class TcpClientServerIntegrationTests
{
private static ILoggerFactory _factory;
[Fact]
public async Task Should_be_able_to_up_server_and_clients_connect_and_disconnect()
{
var testLogger = GetLogger<TcpClientServerIntegrationTests>();
var sessionHandlerMock = new Mock<ITcpServerSessionsHandler>();
var server = new HighPerfTcpServer(GetLogger<HighPerfTcpServer>());
var connections = 10 * 1000;
var port = 10000;
var tcpServerOptions = new TcpServerOptions
{
Address = "127.0.0.1",
ConnectionBufferSize = 128 * 1024,
MaxConnections = connections,
Port = port
};
var startedPort = await server.Start(tcpServerOptions, sessionHandlerMock.Object);
var tasks = Enumerable.Range(0, connections).Select(e =>
{
return Task.Run(async () =>
{
var client = new HighPerfTcpClient();
var clientOptions = new TcpClientOptions
{
Address = "127.0.0.1",
Port = startedPort,
BufferSize = 128 * 1024,
OnData = OnClientData,
OnDisconnect = OnDisconnect
};
await client.Connect(clientOptions);
testLogger.LogDebug($"{e} client connected");
await client.Disconnect();
testLogger.LogDebug($"{e} client disconnected");
});
});
await Task.WhenAll(tasks);
}
private async Task OnDisconnect()
{
}
private async Task OnClientData(ArraySegment<byte> segment)
{
}
private ILogger<T> GetLogger<T>()
{
return _factory.CreateLogger<T>();
}
static TcpClientServerIntegrationTests()
{
var configuration = new LoggingConfiguration();
var target = new FileTarget();
target.FileName = "./logs/log.${logger}.txt";
target.KeepFileOpen = false;
configuration.AddRule(NLog.LogLevel.Debug, NLog.LogLevel.Error, target);
_factory = LoggerFactory.Create(builder =>
{
builder.AddNLog(configuration);
});
}
}
When the test is run, the server only gets a connection for about 2k connections and the rest of clients are thrown with error ConnectionRefused
, where I would expect all 10k requests are completed since the Listen
method of server is given a backlog of int.MaxValue
.
I tried to use different possible values for the backlog and found no difference. Anyone can shed some light on this issue?
Upvotes: 0
Views: 65