Ahmed
Ahmed

Reputation: 129

Multithreaded server gives "Cannot access a disposed object" error when multiple clients try to connect

I have a multithreaded server called server.cs and a client.cs the objective of this program is as following:

Each client program will create a connection to the server. The server will reply with a welcome message (a string). The client will send to the server a string of json format. The server will reply with the same format, but added information on secret and END status. Then, the client will send a message to the server to stop the communication and will close the connection. When all the clients have communicated, at the end there will be a client with id=-1 , that informs the server to stop. When the server receives the message from the ending client (with id = -1), it must print all collected information and the number of the communicated clients.

This biggest part is done. When i use the SequentialSimulation() method in my client.cs and run many instances of the client.cs program the server works just fine and does what the above description said. But when i use the ConcurrentSimulation() method in the client.cs and only run one instances of the clients.cs it crashes and gives me the following error:

Unhandled exception. System.AggregateException: One or more errors occurred. (Cannot access a disposed object.
Object name: 'System.Net.Sockets.Socket'.)
 ---> System.ObjectDisposedException: Cannot access a disposed object.
Object name: 'System.Net.Sockets.Socket'.
   at System.Net.Sockets.Socket.get_RemoteEndPoint()
   at SocketClient.Client.endCommunication() in /Users/test/Documents/client/Program.cs:line 143
   at SocketClient.ClientsSimulator.SequentialSimulation() in /Users/test/Documents/client/Program.cs:line 174
   at SocketClient.ClientsSimulator.<ConcurrentSimulation>b__5_0() in /Users/test/Documents/client/Program.cs:line 191
   at System.Threading.Tasks.Task.InnerInvoke()
   at System.Threading.Tasks.Task.<>c.<.cctor>b__274_0(Object obj)
   at System.Threading.ExecutionContext.RunFromThreadPoolDispatchLoop(Thread threadPoolThread, ExecutionContext executionContext, ContextCallback callback, Object state)
--- End of stack trace from previous location where exception was thrown ---
   at System.Threading.ExecutionContext.RunFromThreadPoolDispatchLoop(Thread threadPoolThread, ExecutionContext executionContext, ContextCallback callback, Object state)
   at System.Threading.Tasks.Task.ExecuteWithThreadLocal(Task& currentTaskSlot, Thread threadPoolThread)
   --- End of inner exception stack trace ---
   at System.Threading.Tasks.Task.Wait(Int32 millisecondsTimeout, CancellationToken cancellationToken)
   at System.Threading.Tasks.Task.Wait()
   at SocketClient.ClientsSimulator.ConcurrentSimulation() in /Users/test/Documents/client/Program.cs:line 197
   at SocketClient.Program.Main(String[] args) in /Users/test/Documents/client/Program.cs:line 219

The goal of the ConcurrentSimulation() is to make multiple clients connect to the server instead of running instances of the client.cs program( the server should be able to handel 200 clients at once, running 200 instances of the client.cs is a lot of work).

Help me resolve this problem below you will find the code.

Server.cs:

using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Text.Json;
using System.Threading;

namespace SocketServer
{
    public class ClientInfo
    {
        public string studentnr { get; set; }
        public string classname { get; set; }
        public int clientid { get; set; }
        public string teamname { get; set; }
        public string ip { get; set; }
        public string secret { get; set; }
        public string status { get; set; }
    }

    public class Message
    {
        public const string welcome = "WELCOME";
        public const string stopCommunication = "COMC-STOP";
        public const string statusEnd = "STAT-STOP";
        public const string secret = "SECRET";
    }

    public class SequentialServer
    {
        public Socket listener;
        public IPEndPoint localEndPoint;
        //Definig the ip address
        public IPAddress ipAddress = IPAddress.Parse("127.0.0.1");
        // Defining the portnumber
        public readonly int portNumber = 11111;

        public String results = "";
        public LinkedList<ClientInfo> clients = new LinkedList<ClientInfo>();

        private Boolean stopCond = false;
        private int processingTime = 1000;
        private int listeningQueueSize = 5;

        public void prepareServer()
        {
            byte[] bytes = new Byte[1024];
            String data = null;
            int numByte = 0;
            string replyMsg = "";
            bool stop;

            try
            {
                Console.WriteLine("[Server] is ready to start ...");
                // Establish the local endpoint
                localEndPoint = new IPEndPoint(ipAddress, portNumber);
                listener = new Socket(ipAddress.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
                Console.Out.WriteLine("[Server] A socket is established ...");
                // associate a network address to the Server Socket. All clients must know this address
                listener.Bind(localEndPoint);
                // This is a non-blocking listen with max number of pending requests
                listener.Listen(listeningQueueSize);
                while (true)
                {
                    Console.WriteLine("Waiting connection ... ");
                    // Suspend while waiting for incoming connection 
                    Socket connection = listener.Accept();
                    this.sendReply(connection, Message.welcome);

                    stop = false;
                    while (!stop)
                    {
                        numByte = connection.Receive(bytes);
                        data = Encoding.ASCII.GetString(bytes, 0, numByte);
                        replyMsg = processMessage(data);
                        if (replyMsg.Equals(Message.stopCommunication))
                        {
                            stop = true;
                            break;
                        }
                        else
                            this.sendReply(connection, replyMsg);
                    }

                }

            }
            catch (Exception e)
            {
                Console.Out.WriteLine(e.Message);
            }
        }
        public void handleClient(Socket con)
        {
        }
        public string processMessage(String msg)
        {
            Thread.Sleep(processingTime);
            Console.WriteLine("[Server] received from the client -> {0} ", msg);
            string replyMsg = "";

            try
            {
                switch (msg)
                {
                    case Message.stopCommunication:
                        replyMsg = Message.stopCommunication;
                        break;
                    default:
                        ClientInfo c = JsonSerializer.Deserialize<ClientInfo>(msg.ToString());
                        clients.AddLast(c);
                        if (c.clientid == -1)
                        {
                            stopCond = true;
                            exportResults();
                        }
                        c.secret = c.studentnr + Message.secret;
                        c.status = Message.statusEnd;
                        replyMsg = JsonSerializer.Serialize<ClientInfo>(c);
                        break;
                }
            }
            catch (Exception e)
            {
                Console.Out.WriteLine("[Server] processMessage {0}", e.Message);
            }

            return replyMsg;
        }
        public void sendReply(Socket connection, string msg)
        {
            byte[] encodedMsg = Encoding.ASCII.GetBytes(msg);
            connection.Send(encodedMsg);
        }
        public void exportResults()
        {
            if (stopCond)
            {
                this.printClients();
            }
        }
        public void printClients()
        {
            string delimiter = " , ";
            Console.Out.WriteLine("[Server] This is the list of clients communicated");
            foreach (ClientInfo c in clients)
            {
                Console.WriteLine(c.classname + delimiter + c.studentnr + delimiter + c.clientid.ToString());
            }
            Console.Out.WriteLine("[Server] Number of handled clients: {0}", clients.Count);

            clients.Clear();
            stopCond = false;

        }
    }



    public class ConcurrentServer
    {
        public  static Socket listener;
        public static IPEndPoint localEndPoint;
        public static List<HostInfo> hosts;
        public static IPAddress ipAddress = IPAddress.Parse("127.0.0.1");
        public static readonly int portNumber = 11111;
        public static LinkedList<ClientInfo> clients = new LinkedList<ClientInfo>();

        private static Boolean stopCond = false;
        private static int processingTime = 1000;
        private static int listeningQueueSize = 5;


        public void prepareServer()
        {


            try
            {
                Console.WriteLine("[ConcurrentServer] is ready to start ...");


                listener = new Socket(ipAddress.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
                localEndPoint = new IPEndPoint(ipAddress, portNumber);
                listener.Bind(localEndPoint);
                // A list of clients// to do: change the name if the list
                hosts = new List<HostInfo>();
                Thread listenThread = new Thread(ListenThread);

                listenThread.Start();

                Console.WriteLine("Waiting connection ... ");

            }
            catch (Exception e)
            {
            }


        }

        public static  void ClientInfoThread(object clientSocket)
        {
            // A thread that handles the incoming and outgoing messages.
            Socket cSocket = (Socket) clientSocket;

            byte[] bytes = new Byte[1024];
            String data = null;
            int numByte = 0;
            string replyMsg = "";
            bool stop;

            try

            {
            while (true)
            {
                // Suspend while waiting for incoming connection 
                //Socket connection = listener.Accept();
                Console.WriteLine("Waiting connection ... ");
                sendReply(cSocket, Message.welcome);

                stop = false;
                while (!stop)
                {
                    numByte = cSocket.Receive(bytes);
                    data = Encoding.ASCII.GetString(bytes, 0, numByte);
                    replyMsg = processMessage(data);
                    if (replyMsg.Equals(Message.stopCommunication))
                    {
                        stop = true;
                        break;
                    }
                    else
                        sendReply(cSocket, replyMsg);
                }

            }


            }catch(Exception e){
                //catches exception
            }

        }
        // a thread that adds each client socket  that is trying to connect, in the list.
        public static void ListenThread()
        {
            for( ; ; )
            {
            listener.Listen(0);
            hosts.Add(new HostInfo(listener.Accept()));  
            }
        }
        public  static string processMessage(String msg)
        {
            Thread.Sleep(processingTime);
            Console.WriteLine("[ConcurrentServer] received from the client -> {0} ", msg);
            string replyMsg = "";

            try
            {
                switch (msg)
                {
                    case Message.stopCommunication:
                        replyMsg = Message.stopCommunication;
                        break;
                    default:
                        ClientInfo c = JsonSerializer.Deserialize<ClientInfo>(msg.ToString());
                        clients.AddLast(c);
                        if (c.clientid == -1)
                        {
                            stopCond = true;
                            exportResults();
                        }
                        c.secret = c.studentnr + Message.secret;
                        c.status = Message.statusEnd;
                        replyMsg = JsonSerializer.Serialize<ClientInfo>(c);
                        break;
                }
            }
            catch (Exception e)
            {
            }

            return replyMsg;
        }
        public static void sendReply(Socket connection, string msg)
        {
            byte[] encodedMsg = Encoding.ASCII.GetBytes(msg);
            connection.Send(encodedMsg);
        }
        public static void exportResults()
        {
            if (stopCond)
            {
                printClients();
            }
        }
        public static void printClients()
        {
            string delimiter = " , ";
            Console.Out.WriteLine("[ConcurrentServer] This is the list of clients communicated");
            foreach (ClientInfo c in clients)
            {
                Console.WriteLine(c.classname + delimiter + c.studentnr + delimiter + c.clientid.ToString());
            }
            Console.Out.WriteLine("[ConcurrentServer] Number of handled clients: {0}", clients.Count);

            clients.Clear();
            stopCond = false;

        }





    }

    public class HostInfo{

        public Socket hostSocket;
        public Thread hostThread;
        public string id;
        public HostInfo()
        {
            id = Guid.NewGuid().ToString();
            hostThread= new Thread(ConcurrentServer.ClientInfoThread);
            hostThread.Start(hostSocket);

        }
         public HostInfo(Socket hostSocket)
        {
            this.hostSocket = hostSocket;
            id = Guid.NewGuid().ToString();
            hostThread= new Thread(ConcurrentServer.ClientInfoThread);
            hostThread.Start(hostSocket);

        }

    }



    public class ServerSimulator
    {
        public static void sequentialRun()
        {
            Console.Out.WriteLine("[Server] A sample server, sequential version ...");
            SequentialServer server = new SequentialServer();
            server.prepareServer();
        }
        public static void concurrentRun()

        {
            Console.Out.WriteLine("[ConcurrentServer] A sample server, concurrent version ...");
            ConcurrentServer server = new ConcurrentServer();
            server.prepareServer();
        }
    }
    class Program
    {
        // Main Method 
        static void Main(string[] args)
        {
            Console.Clear();
           //ServerSimulator.sequentialRun();
            // todo: uncomment this when the solution is ready.
            ServerSimulator.concurrentRun();
        }

    }
}

client.cs:

using System;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
using System.Threading;

namespace SocketClient
{
    public class ClientInfo
    {
        public string studentnr { get; set; }
        public string classname { get; set; }
        public int clientid { get; set; }
        public string teamname { get; set; }
        public string ip { get; set; }
        public string secret { get; set; }
        public string status { get; set; }
    }

    public class Message
    {
        public const string welcome = "WELCOME";
        public const string stopCommunication = "COMC-STOP";
        public const string statusEnd = "STAT-STOP";
        public const string secret = "SECRET";
    }

    public class Client
    {
        public Socket clientSocket;
        private ClientInfo info;
        public IPEndPoint localEndPoint;
        public IPAddress ipAddress = IPAddress.Parse("127.0.0.1");
        public readonly int portNumber = 11111;
        public readonly int minWaitingTime = 50, maxWaitingTime = 100;
        public int waitingTime = 0;
        string baseStdNumber = "0700";

        private String msgToSend;

        public Client(bool finishing, int n)
        {
            waitingTime = new Random().Next(minWaitingTime, maxWaitingTime);
            info = new ClientInfo();
            info.classname = " INF2X ";
            info.studentnr = this.baseStdNumber + n.ToString();
            info.ip = "127.0.0.1";
            info.clientid = finishing ? -1 : 1;
        }

        public string getClientInfo()
        {
            return JsonSerializer.Serialize<ClientInfo>(info);
        }
        public void prepareClient()
        {
            try
            {
                // Establish the remote endpoint for the socket.
                localEndPoint = new IPEndPoint(ipAddress, portNumber);
                // Creation TCP/IP Socket using  
                clientSocket = new Socket(ipAddress.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
            }
            catch (Exception e)
            {
                Console.Out.WriteLine("[Client] Preparation failed: {0}", e.Message);
            }
        }
        public string processMessage(string msg)
        {
            Console.WriteLine("[Client] from Server -> {0}", msg);
            string replyMsg = "";

            try
            {
                switch (msg)
                {
                    case Message.welcome:
                        replyMsg = this.getClientInfo();
                        break;
                    default:
                        ClientInfo c = JsonSerializer.Deserialize<ClientInfo>(msg.ToString());
                        if (c.status == Message.statusEnd)
                        {
                            replyMsg = Message.stopCommunication;
                        }
                        break;
                }
            }
            catch (Exception e)
            {
                Console.Out.WriteLine("[Client] processMessage {0}", e.Message);
            }
            return replyMsg;
        }
        public void startCommunication()
        {
            Console.Out.WriteLine("[Client] **************");
            Thread.Sleep(waitingTime);
            // Data buffer 
            byte[] messageReceived = new byte[1024];
            int numBytes = 0;
            String rcvdMsg = null;
            Boolean stop = false;
            string reply = "";

            try
            {
                // Connect Socket to the remote endpoint 
                clientSocket.Connect(localEndPoint);
                // print connected EndPoint information  
                Console.WriteLine("[Client] connected to -> {0} ", clientSocket.RemoteEndPoint.ToString());

                while (!stop)
                {
                    // Receive the messagge using the method Receive().
                    numBytes = clientSocket.Receive(messageReceived);
                    rcvdMsg = Encoding.ASCII.GetString(messageReceived, 0, numBytes);
                    reply = this.processMessage(rcvdMsg);
                    this.sendReply(reply);
                    if (reply.Equals(Message.stopCommunication))
                    {
                        stop = true;
                    }
                }
            }
            catch (Exception e)
            {
                Console.WriteLine(e.Message);
            }
        }
        public void sendReply(string msg)
        {
            // Create the message to send
            Console.Out.WriteLine("[Client] Message to be sent: {0}", msg);
            byte[] messageSent = Encoding.ASCII.GetBytes(msg);
            int byteSent = clientSocket.Send(messageSent);
        }
        public void endCommunication()
        {
            Console.Out.WriteLine("[Client] End of communication to -> {0} ", clientSocket.RemoteEndPoint.ToString());
            clientSocket.Shutdown(SocketShutdown.Both);
            clientSocket.Close();
        }
    }

    public class ClientsSimulator
    {
        private int numberOfClients;
        private Client[] clients;
        public readonly int waitingTimeForStop = 2000;


        public ClientsSimulator(int n, int t)
        {
            numberOfClients = n;
            clients = new Client[numberOfClients];
            for (int i = 0; i < numberOfClients; i++)
            {
                clients[i] = new Client(false, i);
            }
        }

        public void SequentialSimulation()
        {
            while(true){
            Console.Out.WriteLine("\n[ClientSimulator] Sequential simulator is going to start ...");
            for (int i = 0; i < numberOfClients; i++)
            {
                clients[i].prepareClient();
                clients[i].startCommunication();
                clients[i].endCommunication();
            }

            Console.Out.WriteLine("\n[ClientSimulator] All clients finished with their communications ... ");

           Thread.Sleep(waitingTimeForStop);

            Client endClient = new Client(true, -1);
            endClient.prepareClient();
            endClient.startCommunication();
            endClient.endCommunication();
        }
        }

        public void ConcurrentSimulation()
        {
            Console.Out.WriteLine("[ClientSimulator] Concurrent simulator is going to start ...");
            var t = Task.Run(() => SequentialSimulation() );
            var t1 = Task.Run(() => SequentialSimulation() );
            var t2 = Task.Run(() => SequentialSimulation() );
            var t3 = Task.Run(() => SequentialSimulation() );
            var t4= Task.Run(() => SequentialSimulation() );
            var t5 = Task.Run(() => SequentialSimulation() );
            t.Wait();
            t1.Wait();
            t2.Wait();
            t3.Wait();
            t4.Wait();
            t5.Wait();



        }
    }
    class Program
    {
        // Main Method 
        static void Main(string[] args)
        {
            Console.Clear();
            int wt = 5000, nc = 20;
            ClientsSimulator clientsSimulator = new ClientsSimulator(nc, wt);
            //clientsSimulator.SequentialSimulation();
            Thread.Sleep(wt);
            // todo: Uncomment this, after finishing the method.
            clientsSimulator.ConcurrentSimulation();

        }
    }
}

Upvotes: 1

Views: 1345

Answers (1)

gabriel.hayes
gabriel.hayes

Reputation: 2313

You have some race conditions going on

Look at this:

  public void ConcurrentSimulation()
        {
            Console.Out.WriteLine("[ClientSimulator] Concurrent simulator is going to start ...");
            var t = Task.Run(() => SequentialSimulation() );
            var t1 = Task.Run(() => SequentialSimulation() );
            var t2 = Task.Run(() => SequentialSimulation() );
            var t3 = Task.Run(() => SequentialSimulation() );
            var t4= Task.Run(() => SequentialSimulation() );
            var t5 = Task.Run(() => SequentialSimulation() );
            t.Wait();
            t1.Wait();
            t2.Wait();
            t3.Wait();
            t4.Wait();
            t5.Wait();



        }

You run SequentialSimulation 5 times concurrently; this method gives each of the clients you've queued up for the simulation a new clientSocket Socket object, however you run this at the same time 5 times, so then the rest of the code runs for the same socket 5 times (you're creating 5 sockets per client and only using the last one)

This means that you at some point close the socket, and the the remaining threads continue to try to use the underlying clientSocket, you get an error.

You need each SequentialSimulation to run with its own set of Client objects, and you need to otherwise ensure that you don't attempt to access a socket after you have disposed of it.

Upvotes: 1

Related Questions