Reputation: 129
I have a multithreaded server called server.cs and a client.cs the objective of this program is as following:
Each client program will create a connection to the server. The server will reply with a welcome message (a string). The client will send to the server a string of json format. The server will reply with the same format, but added information on secret and END status. Then, the client will send a message to the server to stop the communication and will close the connection. When all the clients have communicated, at the end there will be a client with id=-1 , that informs the server to stop. When the server receives the message from the ending client (with id = -1), it must print all collected information and the number of the communicated clients.
This biggest part is done. When i use the SequentialSimulation()
method in my client.cs and run many instances of the client.cs program the server works just fine and does what the above description said. But when i use the ConcurrentSimulation()
method in the client.cs and only run one instances of the clients.cs it crashes and gives me the following error:
Unhandled exception. System.AggregateException: One or more errors occurred. (Cannot access a disposed object.
Object name: 'System.Net.Sockets.Socket'.)
---> System.ObjectDisposedException: Cannot access a disposed object.
Object name: 'System.Net.Sockets.Socket'.
at System.Net.Sockets.Socket.get_RemoteEndPoint()
at SocketClient.Client.endCommunication() in /Users/test/Documents/client/Program.cs:line 143
at SocketClient.ClientsSimulator.SequentialSimulation() in /Users/test/Documents/client/Program.cs:line 174
at SocketClient.ClientsSimulator.<ConcurrentSimulation>b__5_0() in /Users/test/Documents/client/Program.cs:line 191
at System.Threading.Tasks.Task.InnerInvoke()
at System.Threading.Tasks.Task.<>c.<.cctor>b__274_0(Object obj)
at System.Threading.ExecutionContext.RunFromThreadPoolDispatchLoop(Thread threadPoolThread, ExecutionContext executionContext, ContextCallback callback, Object state)
--- End of stack trace from previous location where exception was thrown ---
at System.Threading.ExecutionContext.RunFromThreadPoolDispatchLoop(Thread threadPoolThread, ExecutionContext executionContext, ContextCallback callback, Object state)
at System.Threading.Tasks.Task.ExecuteWithThreadLocal(Task& currentTaskSlot, Thread threadPoolThread)
--- End of inner exception stack trace ---
at System.Threading.Tasks.Task.Wait(Int32 millisecondsTimeout, CancellationToken cancellationToken)
at System.Threading.Tasks.Task.Wait()
at SocketClient.ClientsSimulator.ConcurrentSimulation() in /Users/test/Documents/client/Program.cs:line 197
at SocketClient.Program.Main(String[] args) in /Users/test/Documents/client/Program.cs:line 219
The goal of the ConcurrentSimulation()
is to make multiple clients connect to the server instead of running instances of the client.cs program( the server should be able to handel 200 clients at once, running 200 instances of the client.cs is a lot of work).
Help me resolve this problem below you will find the code.
Server.cs:
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Text.Json;
using System.Threading;
namespace SocketServer
{
public class ClientInfo
{
public string studentnr { get; set; }
public string classname { get; set; }
public int clientid { get; set; }
public string teamname { get; set; }
public string ip { get; set; }
public string secret { get; set; }
public string status { get; set; }
}
public class Message
{
public const string welcome = "WELCOME";
public const string stopCommunication = "COMC-STOP";
public const string statusEnd = "STAT-STOP";
public const string secret = "SECRET";
}
public class SequentialServer
{
public Socket listener;
public IPEndPoint localEndPoint;
//Definig the ip address
public IPAddress ipAddress = IPAddress.Parse("127.0.0.1");
// Defining the portnumber
public readonly int portNumber = 11111;
public String results = "";
public LinkedList<ClientInfo> clients = new LinkedList<ClientInfo>();
private Boolean stopCond = false;
private int processingTime = 1000;
private int listeningQueueSize = 5;
public void prepareServer()
{
byte[] bytes = new Byte[1024];
String data = null;
int numByte = 0;
string replyMsg = "";
bool stop;
try
{
Console.WriteLine("[Server] is ready to start ...");
// Establish the local endpoint
localEndPoint = new IPEndPoint(ipAddress, portNumber);
listener = new Socket(ipAddress.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
Console.Out.WriteLine("[Server] A socket is established ...");
// associate a network address to the Server Socket. All clients must know this address
listener.Bind(localEndPoint);
// This is a non-blocking listen with max number of pending requests
listener.Listen(listeningQueueSize);
while (true)
{
Console.WriteLine("Waiting connection ... ");
// Suspend while waiting for incoming connection
Socket connection = listener.Accept();
this.sendReply(connection, Message.welcome);
stop = false;
while (!stop)
{
numByte = connection.Receive(bytes);
data = Encoding.ASCII.GetString(bytes, 0, numByte);
replyMsg = processMessage(data);
if (replyMsg.Equals(Message.stopCommunication))
{
stop = true;
break;
}
else
this.sendReply(connection, replyMsg);
}
}
}
catch (Exception e)
{
Console.Out.WriteLine(e.Message);
}
}
public void handleClient(Socket con)
{
}
public string processMessage(String msg)
{
Thread.Sleep(processingTime);
Console.WriteLine("[Server] received from the client -> {0} ", msg);
string replyMsg = "";
try
{
switch (msg)
{
case Message.stopCommunication:
replyMsg = Message.stopCommunication;
break;
default:
ClientInfo c = JsonSerializer.Deserialize<ClientInfo>(msg.ToString());
clients.AddLast(c);
if (c.clientid == -1)
{
stopCond = true;
exportResults();
}
c.secret = c.studentnr + Message.secret;
c.status = Message.statusEnd;
replyMsg = JsonSerializer.Serialize<ClientInfo>(c);
break;
}
}
catch (Exception e)
{
Console.Out.WriteLine("[Server] processMessage {0}", e.Message);
}
return replyMsg;
}
public void sendReply(Socket connection, string msg)
{
byte[] encodedMsg = Encoding.ASCII.GetBytes(msg);
connection.Send(encodedMsg);
}
public void exportResults()
{
if (stopCond)
{
this.printClients();
}
}
public void printClients()
{
string delimiter = " , ";
Console.Out.WriteLine("[Server] This is the list of clients communicated");
foreach (ClientInfo c in clients)
{
Console.WriteLine(c.classname + delimiter + c.studentnr + delimiter + c.clientid.ToString());
}
Console.Out.WriteLine("[Server] Number of handled clients: {0}", clients.Count);
clients.Clear();
stopCond = false;
}
}
public class ConcurrentServer
{
public static Socket listener;
public static IPEndPoint localEndPoint;
public static List<HostInfo> hosts;
public static IPAddress ipAddress = IPAddress.Parse("127.0.0.1");
public static readonly int portNumber = 11111;
public static LinkedList<ClientInfo> clients = new LinkedList<ClientInfo>();
private static Boolean stopCond = false;
private static int processingTime = 1000;
private static int listeningQueueSize = 5;
public void prepareServer()
{
try
{
Console.WriteLine("[ConcurrentServer] is ready to start ...");
listener = new Socket(ipAddress.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
localEndPoint = new IPEndPoint(ipAddress, portNumber);
listener.Bind(localEndPoint);
// A list of clients// to do: change the name if the list
hosts = new List<HostInfo>();
Thread listenThread = new Thread(ListenThread);
listenThread.Start();
Console.WriteLine("Waiting connection ... ");
}
catch (Exception e)
{
}
}
public static void ClientInfoThread(object clientSocket)
{
// A thread that handles the incoming and outgoing messages.
Socket cSocket = (Socket) clientSocket;
byte[] bytes = new Byte[1024];
String data = null;
int numByte = 0;
string replyMsg = "";
bool stop;
try
{
while (true)
{
// Suspend while waiting for incoming connection
//Socket connection = listener.Accept();
Console.WriteLine("Waiting connection ... ");
sendReply(cSocket, Message.welcome);
stop = false;
while (!stop)
{
numByte = cSocket.Receive(bytes);
data = Encoding.ASCII.GetString(bytes, 0, numByte);
replyMsg = processMessage(data);
if (replyMsg.Equals(Message.stopCommunication))
{
stop = true;
break;
}
else
sendReply(cSocket, replyMsg);
}
}
}catch(Exception e){
//catches exception
}
}
// a thread that adds each client socket that is trying to connect, in the list.
public static void ListenThread()
{
for( ; ; )
{
listener.Listen(0);
hosts.Add(new HostInfo(listener.Accept()));
}
}
public static string processMessage(String msg)
{
Thread.Sleep(processingTime);
Console.WriteLine("[ConcurrentServer] received from the client -> {0} ", msg);
string replyMsg = "";
try
{
switch (msg)
{
case Message.stopCommunication:
replyMsg = Message.stopCommunication;
break;
default:
ClientInfo c = JsonSerializer.Deserialize<ClientInfo>(msg.ToString());
clients.AddLast(c);
if (c.clientid == -1)
{
stopCond = true;
exportResults();
}
c.secret = c.studentnr + Message.secret;
c.status = Message.statusEnd;
replyMsg = JsonSerializer.Serialize<ClientInfo>(c);
break;
}
}
catch (Exception e)
{
}
return replyMsg;
}
public static void sendReply(Socket connection, string msg)
{
byte[] encodedMsg = Encoding.ASCII.GetBytes(msg);
connection.Send(encodedMsg);
}
public static void exportResults()
{
if (stopCond)
{
printClients();
}
}
public static void printClients()
{
string delimiter = " , ";
Console.Out.WriteLine("[ConcurrentServer] This is the list of clients communicated");
foreach (ClientInfo c in clients)
{
Console.WriteLine(c.classname + delimiter + c.studentnr + delimiter + c.clientid.ToString());
}
Console.Out.WriteLine("[ConcurrentServer] Number of handled clients: {0}", clients.Count);
clients.Clear();
stopCond = false;
}
}
public class HostInfo{
public Socket hostSocket;
public Thread hostThread;
public string id;
public HostInfo()
{
id = Guid.NewGuid().ToString();
hostThread= new Thread(ConcurrentServer.ClientInfoThread);
hostThread.Start(hostSocket);
}
public HostInfo(Socket hostSocket)
{
this.hostSocket = hostSocket;
id = Guid.NewGuid().ToString();
hostThread= new Thread(ConcurrentServer.ClientInfoThread);
hostThread.Start(hostSocket);
}
}
public class ServerSimulator
{
public static void sequentialRun()
{
Console.Out.WriteLine("[Server] A sample server, sequential version ...");
SequentialServer server = new SequentialServer();
server.prepareServer();
}
public static void concurrentRun()
{
Console.Out.WriteLine("[ConcurrentServer] A sample server, concurrent version ...");
ConcurrentServer server = new ConcurrentServer();
server.prepareServer();
}
}
class Program
{
// Main Method
static void Main(string[] args)
{
Console.Clear();
//ServerSimulator.sequentialRun();
// todo: uncomment this when the solution is ready.
ServerSimulator.concurrentRun();
}
}
}
client.cs:
using System;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
using System.Threading;
namespace SocketClient
{
public class ClientInfo
{
public string studentnr { get; set; }
public string classname { get; set; }
public int clientid { get; set; }
public string teamname { get; set; }
public string ip { get; set; }
public string secret { get; set; }
public string status { get; set; }
}
public class Message
{
public const string welcome = "WELCOME";
public const string stopCommunication = "COMC-STOP";
public const string statusEnd = "STAT-STOP";
public const string secret = "SECRET";
}
public class Client
{
public Socket clientSocket;
private ClientInfo info;
public IPEndPoint localEndPoint;
public IPAddress ipAddress = IPAddress.Parse("127.0.0.1");
public readonly int portNumber = 11111;
public readonly int minWaitingTime = 50, maxWaitingTime = 100;
public int waitingTime = 0;
string baseStdNumber = "0700";
private String msgToSend;
public Client(bool finishing, int n)
{
waitingTime = new Random().Next(minWaitingTime, maxWaitingTime);
info = new ClientInfo();
info.classname = " INF2X ";
info.studentnr = this.baseStdNumber + n.ToString();
info.ip = "127.0.0.1";
info.clientid = finishing ? -1 : 1;
}
public string getClientInfo()
{
return JsonSerializer.Serialize<ClientInfo>(info);
}
public void prepareClient()
{
try
{
// Establish the remote endpoint for the socket.
localEndPoint = new IPEndPoint(ipAddress, portNumber);
// Creation TCP/IP Socket using
clientSocket = new Socket(ipAddress.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
}
catch (Exception e)
{
Console.Out.WriteLine("[Client] Preparation failed: {0}", e.Message);
}
}
public string processMessage(string msg)
{
Console.WriteLine("[Client] from Server -> {0}", msg);
string replyMsg = "";
try
{
switch (msg)
{
case Message.welcome:
replyMsg = this.getClientInfo();
break;
default:
ClientInfo c = JsonSerializer.Deserialize<ClientInfo>(msg.ToString());
if (c.status == Message.statusEnd)
{
replyMsg = Message.stopCommunication;
}
break;
}
}
catch (Exception e)
{
Console.Out.WriteLine("[Client] processMessage {0}", e.Message);
}
return replyMsg;
}
public void startCommunication()
{
Console.Out.WriteLine("[Client] **************");
Thread.Sleep(waitingTime);
// Data buffer
byte[] messageReceived = new byte[1024];
int numBytes = 0;
String rcvdMsg = null;
Boolean stop = false;
string reply = "";
try
{
// Connect Socket to the remote endpoint
clientSocket.Connect(localEndPoint);
// print connected EndPoint information
Console.WriteLine("[Client] connected to -> {0} ", clientSocket.RemoteEndPoint.ToString());
while (!stop)
{
// Receive the messagge using the method Receive().
numBytes = clientSocket.Receive(messageReceived);
rcvdMsg = Encoding.ASCII.GetString(messageReceived, 0, numBytes);
reply = this.processMessage(rcvdMsg);
this.sendReply(reply);
if (reply.Equals(Message.stopCommunication))
{
stop = true;
}
}
}
catch (Exception e)
{
Console.WriteLine(e.Message);
}
}
public void sendReply(string msg)
{
// Create the message to send
Console.Out.WriteLine("[Client] Message to be sent: {0}", msg);
byte[] messageSent = Encoding.ASCII.GetBytes(msg);
int byteSent = clientSocket.Send(messageSent);
}
public void endCommunication()
{
Console.Out.WriteLine("[Client] End of communication to -> {0} ", clientSocket.RemoteEndPoint.ToString());
clientSocket.Shutdown(SocketShutdown.Both);
clientSocket.Close();
}
}
public class ClientsSimulator
{
private int numberOfClients;
private Client[] clients;
public readonly int waitingTimeForStop = 2000;
public ClientsSimulator(int n, int t)
{
numberOfClients = n;
clients = new Client[numberOfClients];
for (int i = 0; i < numberOfClients; i++)
{
clients[i] = new Client(false, i);
}
}
public void SequentialSimulation()
{
while(true){
Console.Out.WriteLine("\n[ClientSimulator] Sequential simulator is going to start ...");
for (int i = 0; i < numberOfClients; i++)
{
clients[i].prepareClient();
clients[i].startCommunication();
clients[i].endCommunication();
}
Console.Out.WriteLine("\n[ClientSimulator] All clients finished with their communications ... ");
Thread.Sleep(waitingTimeForStop);
Client endClient = new Client(true, -1);
endClient.prepareClient();
endClient.startCommunication();
endClient.endCommunication();
}
}
public void ConcurrentSimulation()
{
Console.Out.WriteLine("[ClientSimulator] Concurrent simulator is going to start ...");
var t = Task.Run(() => SequentialSimulation() );
var t1 = Task.Run(() => SequentialSimulation() );
var t2 = Task.Run(() => SequentialSimulation() );
var t3 = Task.Run(() => SequentialSimulation() );
var t4= Task.Run(() => SequentialSimulation() );
var t5 = Task.Run(() => SequentialSimulation() );
t.Wait();
t1.Wait();
t2.Wait();
t3.Wait();
t4.Wait();
t5.Wait();
}
}
class Program
{
// Main Method
static void Main(string[] args)
{
Console.Clear();
int wt = 5000, nc = 20;
ClientsSimulator clientsSimulator = new ClientsSimulator(nc, wt);
//clientsSimulator.SequentialSimulation();
Thread.Sleep(wt);
// todo: Uncomment this, after finishing the method.
clientsSimulator.ConcurrentSimulation();
}
}
}
Upvotes: 1
Views: 1345
Reputation: 2313
You have some race conditions going on
Look at this:
public void ConcurrentSimulation()
{
Console.Out.WriteLine("[ClientSimulator] Concurrent simulator is going to start ...");
var t = Task.Run(() => SequentialSimulation() );
var t1 = Task.Run(() => SequentialSimulation() );
var t2 = Task.Run(() => SequentialSimulation() );
var t3 = Task.Run(() => SequentialSimulation() );
var t4= Task.Run(() => SequentialSimulation() );
var t5 = Task.Run(() => SequentialSimulation() );
t.Wait();
t1.Wait();
t2.Wait();
t3.Wait();
t4.Wait();
t5.Wait();
}
You run SequentialSimulation
5 times concurrently; this method gives each of the clients you've queued up for the simulation a new clientSocket
Socket
object, however you run this at the same time 5 times, so then the rest of the code runs for the same socket 5 times (you're creating 5 sockets per client and only using the last one)
This means that you at some point close the socket, and the the remaining threads continue to try to use the underlying clientSocket
, you get an error.
You need each SequentialSimulation
to run with its own set of Client
objects, and you need to otherwise ensure that you don't attempt to access a socket after you have disposed of it.
Upvotes: 1