Reputation: 3993
I have client and server written in classic .net communicating through TCP sockets. I also have load tests with many parallel connections and it all works.
However, same code using .netcore breaks. On linux it breaks all the time with client getting exceptions while trying to read from stream:
Client socket error: Unable to read data from the transport connection: Connection timed out.
Or server might return 0 as bytes to read as well.
On Windows .netcore client breaks less often, but still breaks sometimes with error like:
socket error: Unable to read data from the transport connection: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond
.netcore 3.0 by the way.
Any ideas why that happens?
Client:
public class TcpConnection
{
object _lock = new object();
bool _is_busy = false;
public bool TakeLock()
{
lock (_lock)
{
if (_is_busy)
{
return false;
}
else
{
_is_busy = true;
return true;
}
}
}
public void ReleaseLock()
{
_is_busy = false;
}
public bool Connected { get; set; }
public string ConnError { get; set; }
public Socket client { get; set; }
public Stream stream { get; set; }
public BinaryWriter bw { get; set; }
public DateTime LastUsed { get; set; }
public int Index { get; set; }
public TcpConnection(string hostname, int port)
{
client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
SocketAsyncEventArgs connectEventArg = new SocketAsyncEventArgs();
connectEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(ConnectedEvent);
connectEventArg.UserToken = this;
connectEventArg.RemoteEndPoint = new IPEndPoint(IPAddress.Parse(hostname), port);
var connected = client.ConnectAsync(connectEventArg);
if (!connected)
{
if (connectEventArg.SocketError != SocketError.Success)
{
#if (VERBOSE)
Console.WriteLine("Connection error (immediate)");
#endif
throw new LinqDbException("Linqdb: Connection error (immediate)");
}
#if (VERBOSE)
Console.WriteLine("Connected immediately");
#endif
//client.NoDelay = true;
client.ReceiveTimeout = 60000;
client.SendTimeout = 60000;
this.stream = new NetworkStream(client);
this.bw = new BinaryWriter(stream);
}
else
{
int total_wait_ms = 0;
while (!this.Connected)
{
Thread.Sleep(100);
total_wait_ms += 100;
#if (VERBOSE)
if (total_wait_ms % 2000 == 0)
{
Console.WriteLine("Can't connect in {0} ms", total_wait_ms);
}
#endif
}
if (!string.IsNullOrEmpty(this.ConnError))
{
throw new LinqDbException(this.ConnError + " after " + total_wait_ms + " ms wait time");
}
else
{
#if (VERBOSE)
Console.WriteLine("Connected {0} ms", total_wait_ms);
#endif
}
}
_is_busy = true;
LastUsed = DateTime.Now;
}
private void ConnectedEvent(object sender, SocketAsyncEventArgs e)
{
TcpConnection conn = e.UserToken as TcpConnection;
if (e.SocketError != SocketError.Success)
{
#if (VERBOSE)
Console.WriteLine("Connection error");
#endif
conn.ConnError = "Connection error";
conn.Connected = true;
return;
}
//e.ConnectSocket.NoDelay = true;
e.ConnectSocket.ReceiveTimeout = 60000;
e.ConnectSocket.SendTimeout = 60000;
conn.stream = new NetworkStream(conn.client);
conn.bw = new BinaryWriter(conn.stream);
conn.ConnError = null;
conn.Connected = true;
}
}
public class ClientSockets
{
const int _limit = 100;
TcpConnection[] cons = new TcpConnection[_limit];
object _lock = new object();
object[] _locks = null;
public byte[] CallServer(byte[] input, string hostname, int port, out string error_msg)
{
error_msg = null;
if (_locks == null)
{
lock (_lock)
{
if (_locks == null)
{
_locks = new object[_limit];
for (int i = 0; i < _limit; i++)
{
_locks[i] = new object();
}
}
}
}
TcpConnection conn = null;
while (true)
{
int last_index = 0;
for (int i = _limit - 1; i >= 0; i--)
{
if (cons[i] != null)
{
last_index = i;
break;
}
}
for (int i = 0; i < _limit; i++)
{
var tmp = cons[i];
if (tmp != null)
{
var available = tmp.TakeLock();
if (!available)
{
continue;
}
else
{
if ((DateTime.Now - tmp.LastUsed).TotalSeconds > 30)
{
cons[i] = null;
try
{
tmp.client.Dispose();
tmp.stream.Dispose();
tmp.bw.Dispose();
}
catch (Exception ex)
{
#if (VERBOSE)
Console.WriteLine("Disposing error:" + ex.Message);
#endif
}
continue;
}
else
{
//ping
tmp.bw.Write(BitConverter.GetBytes(-3));
tmp.bw.Flush();
int numBytesRead = 0;
var data = new byte[1024];
var bad = false;
while (numBytesRead < 4)
{
int read = 0;
try
{
read = tmp.stream.Read(data, numBytesRead, data.Length - numBytesRead);
}
catch (Exception ex)
{
//server closed connection
bad = true;
break;
}
numBytesRead += read;
if (read <= 0)
{
//server closed connection
bad = true;
break;
}
}
if (bad)
{
cons[i] = null;
try
{
tmp.client.Dispose();
tmp.stream.Dispose();
tmp.bw.Dispose();
}
catch (Exception ex)
{
#if (VERBOSE)
Console.WriteLine("Disposing error:" + ex.Message);
#endif
}
continue;
}
var pong = BitConverter.ToInt32(new byte[4] { data[0], data[1], data[2], data[3] }, 0);
if (pong != -3)
{
cons[i] = null;
try
{
tmp.client.Dispose();
tmp.stream.Dispose();
tmp.bw.Dispose();
}
catch (Exception ex)
{
#if (VERBOSE)
Console.WriteLine("Disposing error:" + ex.Message);
#endif
}
continue;
}
//socket is ok
conn = tmp;
break;
}
}
}
else
{
if (i < last_index)
{
continue;
}
if (Monitor.TryEnter(_locks[i]))
{
try
{
if (cons[i] != null)
{
continue;
}
conn = new TcpConnection(hostname, port);
cons[i] = conn;
conn.Index = i;
break;
}
catch (Exception ex)
{
conn = null;
cons[i] = null;
#if (VERBOSE)
Console.WriteLine("Client socket creation error: " + ex.Message);
#endif
error_msg = ex.Message;
return BitConverter.GetBytes(-1);
}
finally
{
Monitor.Exit(_locks[i]);
}
}
else
{
continue;
}
}
}
if (conn == null)
{
Thread.Sleep(150);
continue;
}
else
{
break;
}
}
bool error = false;
try
{
var length = BitConverter.GetBytes(input.Length);
var data = new byte[1024];
conn.bw.Write(input);
conn.bw.Flush();
using (MemoryStream ms = new MemoryStream())
{
int numBytesRead;
int total;
while (true)
{
numBytesRead = 0;
while (numBytesRead < 4)
{
int read = conn.stream.Read(data, numBytesRead, data.Length - numBytesRead);
numBytesRead += read;
if (read <= 0)
{
throw new LinqDbException("Read <= 0: " + read);
}
}
numBytesRead -= 4;
total = BitConverter.ToInt32(new byte[4] { data[0], data[1], data[2], data[3] }, 0);
if (total == -2)
{
#if (VERBOSE)
Console.WriteLine("PINGER!!!");
#endif
continue;
}
break;
}
if (numBytesRead > 0)
{
var finput = new byte[numBytesRead];
for (int i = 0; i < numBytesRead; i++)
{
finput[i] = data[4 + i];
}
ms.Write(finput, 0, numBytesRead);
}
total -= numBytesRead;
while (total > 0)
{
numBytesRead = conn.stream.Read(data, 0, data.Length);
if (numBytesRead <= 0)
{
throw new LinqDbException("numBytesRead <= 0: " + numBytesRead);
}
ms.Write(data, 0, numBytesRead);
total -= numBytesRead;
}
conn.LastUsed = DateTime.Now;
return ms.ToArray();
}
}
catch (Exception ex)
{
#if (VERBOSE)
Console.WriteLine("Client socket error: " + ex.Message);
#endif
error = true;
error_msg = ex.Message;
return BitConverter.GetBytes(-1);
}
finally
{
if (!error)
{
conn.ReleaseLock();
}
else
{
cons[conn.Index] = null;
try
{
conn.client.Dispose();
conn.stream.Dispose();
conn.bw.Dispose();
}
catch (Exception ex)
{
#if (VERBOSE)
Console.WriteLine("Disposing error:" + ex.Message);
#endif
}
}
}
}
}
Server:
class Pinger
{
public bool Done { get; set; }
public object _lock = new object();
public BinaryWriter bw { get; set; }
public void Do()
{
try
{
int total_wait = 0;
int sleep_ms = 2000;
while (!Done)
{
Thread.Sleep(sleep_ms);
total_wait += sleep_ms;
if (total_wait % 10000 == 0)
{
lock (_lock)
{
if (!Done)
{
bw.Write(BitConverter.GetBytes(-2));
bw.Flush();
}
}
}
}
}
catch { return; }
}
}
class ServerSockets
{
static Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
static string db_path = null;
static int port = 0;
public static void Main()
{
AppDomain.CurrentDomain.ProcessExit += new EventHandler(OnProcessExit);
CommandHelper.ReadConfig(out db_path, out port);
var sw = new Stopwatch();
sw.Start();
Console.WriteLine("Building in-memory indexes...");
ServerLogic.Logic.ServerBuildIndexesOnStart(db_path);
sw.Stop();
Console.WriteLine("Done building in-memory indexes. It took: " + Math.Round(sw.ElapsedMilliseconds / 60000.0, 0) + " min.");
Console.WriteLine("Listening on " + port);
listener.Bind(new IPEndPoint(IPAddress.Any, port));
listener.Listen((int)SocketOptionName.MaxConnections);
SocketAsyncEventArgs acceptEventArg = new SocketAsyncEventArgs();
acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(Service);
bool willRaiseEvent = listener.AcceptAsync(acceptEventArg);
if (!willRaiseEvent)
{
Service(null, acceptEventArg);
}
while (true)
{
try
{
Thread.Sleep(60000);
#if (VERBOSE)
Console.WriteLine("Still kicking...");
#endif
}
catch (Exception ex)
{
Console.WriteLine("BAD ERROR... " + ex.Message);
}
}
}
static void OnProcessExit(object sender, EventArgs e)
{
ServerLogic.Logic.Dispose();
}
private static void LoopToStartAccept()
{
SocketAsyncEventArgs acceptEventArg = new SocketAsyncEventArgs();
acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(Service);
bool willRaiseEvent = listener.AcceptAsync(acceptEventArg);
if (!willRaiseEvent)
{
Service(null, acceptEventArg);
}
}
private static void HandleBadAccept(SocketAsyncEventArgs acceptEventArgs)
{
#if (VERBOSE)
Console.WriteLine("bad accept");
#endif
acceptEventArgs.AcceptSocket.Dispose();
}
private static void Service(object sender, SocketAsyncEventArgs e)
{
if (e.SocketError != SocketError.Success)
{
LoopToStartAccept();
HandleBadAccept(e);
return;
}
LoopToStartAccept();
try
{
using (Socket soc = e.AcceptSocket)
{
var rg = new Random();
#if (VERBOSE)
Console.WriteLine("New socket: " + rg.Next(0, 1000000));
#endif
//soc.NoDelay = true;
soc.ReceiveTimeout = 60000;
soc.SendTimeout = 60000;
using (Stream stream = new NetworkStream(soc))
using (BinaryWriter bw = new BinaryWriter(stream))
{
while (true) //reuse same connection for many commands
{
byte[] data = new byte[1024];
using (MemoryStream ms = new MemoryStream())
{
int numBytesRead = 0;
while (numBytesRead < 4)
{
int read = 0;
try
{
read = stream.Read(data, numBytesRead, data.Length - numBytesRead);
}
catch (Exception ex)
{
//client closed connection
return;
}
numBytesRead += read;
if (read <= 0)
{
//throw new Exception("Read <= 0: " + read);
//client closed connection
return;
}
}
numBytesRead -= 4;
var total = BitConverter.ToInt32(new byte[4] { data[0], data[1], data[2], data[3] }, 0);
if (total == -3) //ping
{
//pong
bw.Write(BitConverter.GetBytes(-3));
bw.Flush();
continue;
}
if (numBytesRead > 0)
{
var finput = new byte[numBytesRead];
for (int i = 0; i < numBytesRead; i++)
{
finput[i] = data[4 + i];
}
ms.Write(finput, 0, numBytesRead);
}
total -= numBytesRead;
while (total > 0)
{
numBytesRead = stream.Read(data, 0, data.Length);
if (numBytesRead <= 0)
{
throw new Exception("numBytesRead <= 0: " + numBytesRead);
}
ms.Write(data, 0, numBytesRead);
total -= numBytesRead;
}
var input = ms.ToArray();
var pinger = new Pinger()
{
bw = bw
};
ThreadPool.QueueUserWorkItem(f => { pinger.Do(); });
var output = ServerLogic.Logic.Execute(input, db_path);
pinger.Done = true;
lock (pinger._lock)
{
bw.Write(output);
bw.Flush();
}
}
}
}
}
}
catch (Exception ex)
{
#if (VERBOSE)
Console.WriteLine("Socket error: " + ex.Message);
#endif
//try
//{
// var rg = new Random();
// File.WriteAllText("sock_error_" + rg.Next() + ".txt", ex.Message + " " + ex.StackTrace + (ex.InnerException != null ? (" " + ex.InnerException.Message + " " + ex.InnerException.StackTrace) : ""));
//}
//catch (Exception) { }
return;
}
finally
{
#if (VERBOSE)
Console.WriteLine("Listener finally ");
#endif
}
}
}
EDIT Fully reproducible project: https://github.com/ren85/serverclientbug
EDIT No solution still, 500 more points to whoever can fix this.
EDIT Maybe related https://github.com/dotnet/coreclr/issues/11979 https://github.com/dotnet/runtime/issues/364
Upvotes: 2
Views: 1583
Reputation: 3993
Luckily the problem reproduced with just 2 parallel threads. The problem was in server part:
private static void LoopToStartAccept()
{
SocketAsyncEventArgs acceptEventArg = new SocketAsyncEventArgs();
acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(Service);
bool willRaiseEvent = listener.AcceptAsync(acceptEventArg);
if (!willRaiseEvent)
{
Service(null, acceptEventArg);
}
}
Service(null, acceptEventArg); blocked responding socket. Changing it to
Task.Run(() => Service(null, acceptEventArg));
fixed the problem. No idea why it worked on Windows though.
Changing connection in client to synchronous as @AlesD suggested also solved some other problems.
Upvotes: 1
Reputation: 12052
I debugged your code and looks like the problem are bugs not issues with sockets in .NET core. From the code it looks you expect that the first four bytes of the data sent will contain the length of the data but you send only the data. This causes that you get a random data length since the first four bytes of the data are used as the length. This in some cases is bigger than the actual data and then the while loop that is reading the data timeouts on waiting for more data that never arrives.
Here is the problematic part of the code in the client:
var length = BitConverter.GetBytes(input.Length); // You prepare the length
var data = new byte[1024];
conn.bw.Write(length); // This is missing in your code so it never gets sent to the server
conn.bw.Write(input);
conn.bw.Flush();
Full client with the fix included:
public class ClientSockets
{
const int _limit = 100;
TcpConnection[] cons = new TcpConnection[_limit];
object _lock = new object();
object[] _locks = null;
public byte[] CallServer(byte[] input, string hostname, int port, out string error_msg)
{
error_msg = null;
if (_locks == null)
{
lock (_lock)
{
if (_locks == null)
{
_locks = new object[_limit];
for (int i = 0; i < _limit; i++)
{
_locks[i] = new object();
}
}
}
}
TcpConnection conn = null;
while (true)
{
int last_index = 0;
for (int i = _limit - 1; i >= 0; i--)
{
if (cons[i] != null)
{
last_index = i;
break;
}
}
for (int i = 0; i < _limit; i++)
{
var tmp = cons[i];
if (tmp != null)
{
var available = tmp.TakeLock();
if (!available)
{
continue;
}
else
{
if ((DateTime.Now - tmp.LastUsed).TotalSeconds > 30)
{
cons[i] = null;
try
{
tmp.client.Dispose();
tmp.stream.Dispose();
tmp.bw.Dispose();
}
catch (Exception ex)
{
#if (VERBOSE)
Console.WriteLine("Disposing error:" + ex.Message);
#endif
}
continue;
}
else
{
//ping
tmp.bw.Write(BitConverter.GetBytes(-3));
tmp.bw.Flush();
int numBytesRead = 0;
var data = new byte[1024];
var bad = false;
while (numBytesRead < 4)
{
int read = 0;
try
{
read = tmp.stream.Read(data, numBytesRead, data.Length - numBytesRead);
}
catch (Exception ex)
{
//server closed connection
bad = true;
break;
}
numBytesRead += read;
if (read <= 0)
{
//server closed connection
bad = true;
break;
}
}
if (bad)
{
cons[i] = null;
try
{
tmp.client.Dispose();
tmp.stream.Dispose();
tmp.bw.Dispose();
}
catch (Exception ex)
{
#if (VERBOSE)
Console.WriteLine("Disposing error:" + ex.Message);
#endif
}
continue;
}
var pong = BitConverter.ToInt32(new byte[4] { data[0], data[1], data[2], data[3] }, 0);
if (pong != -3)
{
cons[i] = null;
try
{
tmp.client.Dispose();
tmp.stream.Dispose();
tmp.bw.Dispose();
}
catch (Exception ex)
{
#if (VERBOSE)
Console.WriteLine("Disposing error:" + ex.Message);
#endif
}
continue;
}
//socket is ok
conn = tmp;
break;
}
}
}
else
{
if (i < last_index)
{
continue;
}
if (Monitor.TryEnter(_locks[i]))
{
try
{
if (cons[i] != null)
{
continue;
}
conn = new TcpConnection(hostname, port);
cons[i] = conn;
conn.Index = i;
break;
}
catch (Exception ex)
{
conn = null;
cons[i] = null;
#if (VERBOSE)
Console.WriteLine("Client socket creation error: " + ex.Message);
#endif
error_msg = ex.Message;
return BitConverter.GetBytes(-1);
}
finally
{
Monitor.Exit(_locks[i]);
}
}
else
{
continue;
}
}
}
if (conn == null)
{
Thread.Sleep(150);
continue;
}
else
{
break;
}
}
bool error = false;
try
{
var length = BitConverter.GetBytes(input.Length);
var data = new byte[1024];
conn.bw.Write(length); // Send the length first.
conn.bw.Write(input);
conn.bw.Flush();
using (MemoryStream ms = new MemoryStream())
{
int numBytesRead;
int total;
while (true)
{
numBytesRead = 0;
while (numBytesRead < 4)
{
int read = conn.stream.Read(data, numBytesRead, data.Length - numBytesRead);
numBytesRead += read;
if (read <= 0)
{
throw new LinqDbException("Read <= 0: " + read);
}
}
numBytesRead -= 4;
total = BitConverter.ToInt32(new byte[4] { data[0], data[1], data[2], data[3] }, 0);
if (total == -2)
{
#if (VERBOSE)
Console.WriteLine("PINGER!!!");
#endif
continue;
}
break;
}
if (numBytesRead > 0)
{
var finput = new byte[numBytesRead];
for (int i = 0; i < numBytesRead; i++)
{
finput[i] = data[4 + i];
}
ms.Write(finput, 0, numBytesRead);
}
total -= numBytesRead;
while (total > 0)
{
numBytesRead = conn.stream.Read(data, 0, data.Length);
if (numBytesRead <= 0)
{
throw new LinqDbException("numBytesRead <= 0: " + numBytesRead);
}
ms.Write(data, 0, numBytesRead);
total -= numBytesRead;
}
conn.LastUsed = DateTime.Now;
return ms.ToArray();
}
}
catch (Exception ex)
{
#if (VERBOSE)
Console.WriteLine("Client socket error: " + ex.Message);
#endif
error = true;
error_msg = ex.Message;
return BitConverter.GetBytes(-1);
}
finally
{
if (!error)
{
conn.ReleaseLock();
}
else
{
cons[conn.Index] = null;
try
{
conn.client.Dispose();
conn.stream.Dispose();
conn.bw.Dispose();
}
catch (Exception ex)
{
#if (VERBOSE)
Console.WriteLine("Disposing error:" + ex.Message);
#endif
}
}
}
}
}
The same issue is also in the server code:
var output = ServerLogic.Logic.Execute(input, db_path);
var length = BitConverter.GetBytes(output.Length); // You again need to get the length
pinger.Done = true;
lock (pinger._lock)
{
bw.Write(length); // Send it before the data
bw.Write(output);
bw.Flush();
}
Full Server with the fix included:
class ServerSockets
{
static Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
static string db_path = null;
static int port = 0;
public static void Main()
{
AppDomain.CurrentDomain.ProcessExit += new EventHandler(OnProcessExit);
CommandHelper.ReadConfig(out db_path, out port);
var sw = new Stopwatch();
sw.Start();
Console.WriteLine("Building in-memory indexes...");
ServerLogic.Logic.ServerBuildIndexesOnStart(db_path);
sw.Stop();
Console.WriteLine("Done building in-memory indexes. It took: " + Math.Round(sw.ElapsedMilliseconds / 60000.0, 0) + " min.");
Console.WriteLine("Listening on " + port);
listener.Bind(new IPEndPoint(IPAddress.Any, port));
listener.Listen((int)SocketOptionName.MaxConnections);
SocketAsyncEventArgs acceptEventArg = new SocketAsyncEventArgs();
acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(Service);
bool willRaiseEvent = listener.AcceptAsync(acceptEventArg);
if (!willRaiseEvent)
{
Service(null, acceptEventArg);
}
while (true)
{
try
{
Thread.Sleep(60000);
#if (VERBOSE)
Console.WriteLine("Still kicking...");
#endif
}
catch (Exception ex)
{
Console.WriteLine("BAD ERROR... " + ex.Message);
}
}
}
static void OnProcessExit(object sender, EventArgs e)
{
ServerLogic.Logic.Dispose();
}
private static void LoopToStartAccept()
{
SocketAsyncEventArgs acceptEventArg = new SocketAsyncEventArgs();
acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(Service);
bool willRaiseEvent = listener.AcceptAsync(acceptEventArg);
if (!willRaiseEvent)
{
Service(null, acceptEventArg);
}
}
private static void HandleBadAccept(SocketAsyncEventArgs acceptEventArgs)
{
#if (VERBOSE)
Console.WriteLine("bad accept");
#endif
acceptEventArgs.AcceptSocket.Dispose();
}
private static void Service(object sender, SocketAsyncEventArgs e)
{
if (e.SocketError != SocketError.Success)
{
LoopToStartAccept();
HandleBadAccept(e);
return;
}
LoopToStartAccept();
try
{
using (Socket soc = e.AcceptSocket)
{
var rg = new Random();
#if (VERBOSE)
Console.WriteLine("New socket: " + rg.Next(0, 1000000));
#endif
//soc.NoDelay = true;
soc.ReceiveTimeout = 60000;
soc.SendTimeout = 60000;
using (Stream stream = new NetworkStream(soc))
using (BinaryWriter bw = new BinaryWriter(stream))
{
while (true) //reuse same connection for many commands
{
byte[] data = new byte[1024];
using (MemoryStream ms = new MemoryStream())
{
int numBytesRead = 0;
while (numBytesRead < 4)
{
int read = 0;
try
{
read = stream.Read(data, numBytesRead, data.Length - numBytesRead);
}
catch (Exception ex)
{
//client closed connection
return;
}
numBytesRead += read;
if (read <= 0)
{
//throw new Exception("Read <= 0: " + read);
//client closed connection
return;
}
}
numBytesRead -= 4;
var total = BitConverter.ToInt32(new byte[4] { data[0], data[1], data[2], data[3] }, 0);
if (total == -3) //ping
{
//pong
bw.Write(BitConverter.GetBytes(-3));
bw.Flush();
continue;
}
if (numBytesRead > 0)
{
var finput = new byte[numBytesRead];
for (int i = 0; i < numBytesRead; i++)
{
finput[i] = data[4 + i];
}
ms.Write(finput, 0, numBytesRead);
}
total -= numBytesRead;
while (total > 0)
{
numBytesRead = stream.Read(data, 0, data.Length);
if (numBytesRead <= 0)
{
throw new Exception("numBytesRead <= 0: " + numBytesRead);
}
ms.Write(data, 0, numBytesRead);
total -= numBytesRead;
}
var input = ms.ToArray();
var pinger = new Pinger()
{
bw = bw
};
ThreadPool.QueueUserWorkItem(f => { pinger.Do(); });
var output = ServerLogic.Logic.Execute(input, db_path);
var length = BitConverter.GetBytes(output.Length);
pinger.Done = true;
lock (pinger._lock)
{
bw.Write(length);
bw.Write(output);
bw.Flush();
}
}
}
}
}
}
catch (Exception ex)
{
#if (VERBOSE)
Console.WriteLine("Socket error: " + ex.Message);
#endif
//try
//{
// var rg = new Random();
// File.WriteAllText("sock_error_" + rg.Next() + ".txt", ex.Message + " " + ex.StackTrace + (ex.InnerException != null ? (" " + ex.InnerException.Message + " " + ex.InnerException.StackTrace) : ""));
//}
//catch (Exception) { }
return;
}
finally
{
#if (VERBOSE)
Console.WriteLine("Listener finally ");
#endif
}
}
}
For me with this two changes the code works in Windows and Linux. If you still have an issue you will need to provide also more details what data you are sending.
I did some more research on Linux after you posted your GitHub sample. As I already suggested in the comment below you should use the Connect
method instead of the ConnectAsync
to get better behavior. In my test on Raspbian and Arch Linux once you open lots of threads in the client you probably occupy all the ThreadPool so the Connect events stop happening and everything timeouts. Since your code loops on threads and waits you don't gain much by using the Async method anyway.
Here is the changed TcpConnection
class to do synchronous connections:
public class TcpConnection
{
object _lock = new object();
bool _is_busy = false;
public bool TakeLock()
{
lock (_lock)
{
if (_is_busy)
{
return false;
}
else
{
_is_busy = true;
return true;
}
}
}
public void ReleaseLock()
{
_is_busy = false;
}
public bool Connected { get; set; }
public string ConnError { get; set; }
public Socket client { get; set; }
public Stream stream { get; set; }
public BinaryWriter bw { get; set; }
public DateTime LastUsed { get; set; }
public int Index { get; set; }
public TcpConnection(string hostname, int port)
{
client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
client.Connect(new IPEndPoint(IPAddress.Parse(hostname), port));
if (client.Connected)
{
#if (VERBOSE)
Console.WriteLine("Connected immediately");
#endif
//client.NoDelay = true;
client.ReceiveTimeout = 60000;
client.SendTimeout = 60000;
this.stream = new NetworkStream(client);
this.bw = new BinaryWriter(stream);
}
_is_busy = true;
LastUsed = DateTime.Now;
}
}
As a final comment I have to say your code is very complicated and does not follow best practices as already other people pointed out. I advise you to learn more about about multi threading and asynchronous programming and then improve the code which will also make it work better and more predictable.
Here are the links to the Microsoft samples on how to work with sockets asynchronously and the general documentation about how to do Async programming:
Asynchronous Client Socket Example
Asynchronous Server Socket Example
Parallel Processing, Concurrency, and Async Programming in .NET
Upvotes: 10
Reputation: 900
Let me start off with saying that the code is incredibly hard to understand. Not only does a lot of things happen within a method, you're also dealing with threads and (a lot of) locks. This makes things incredibly complex and very hard to pinpoint whats going on.
Its like trying to find a needle in the haystack. Except a hundred times worse because of timeouts, threads and locking mechanisms that constantly shift the needle.
I've ran this project on a Windows 10
machine, with Visual Studio 2019 Preview (16.4.0 preview 2.0)
and .NET Core runtime 3.1.0-preview1
(target framework being 3.0
).
Let me state one thing though, you have set the receive/set timeout
for both the client and server to 60000
. I cannot reproduce your error using this. Because this timeout number is so big, everything eventually gets finished before a timeout can even occur.
In order to reproduce your errors I have downscaled the timeout considerably:
// File: ClientLib.cs
// Method: "ConnectedEvent(...)"
// Line 114-115
e.ConnectSocket.ReceiveTimeout = 1000;
// Lowering it even more multiplies the errors.
The above now gives me about ~5 errors, which then again simply happens because of this code (where the Pinger
is instantiated):
// File: Server\Program.cs
// Method: "Service(...)"
Thread.Sleep(random.Next(100, 1000));
where also the following happens just above:
ThreadPool.QueueUserWorkItem(f => { pinger.Do(); });
The server uses the ThreadPool
and it can handle 8 operations1 concurrently on it. Not only are you using the ThreadPool
when a Socket
connection is established, but also when you do a ThreadPool.QueueUserWorkItem(...)
.
This means big trouble when the client starts trying to establish a huge amount of connections. As this can cause timeouts too.
One way of increasing the threads in the ThreadPool
:
public static void Main()
{
int w, c = 0;
ThreadPool.GetMinThreads(out w, out c);
Console.WriteLine($"ThreadPool max: {w} worker threads {c} completion threads");
ThreadPool.SetMinThreads(100, 100);
ThreadPool.GetMinThreads(out w, out c);
Console.WriteLine($"ThreadPool max: {w} worker threads {c} completion threads");
/// other code...
}
But really, just removing these two lines solves the problem:
var random = new Random();
Thread.Sleep(random.Next(100, 1000));
1 This can differ per machine, see the documentation. You can check it on your machine by using ThreadPool.GetMinThreads(...).
Upvotes: 5