Umer Azaz
Umer Azaz

Reputation: 455

Network stream write is blocked

I am working on a c# application (.net 4) that accepts multiple tcp connections from different clients. There is a single tcp listener that accepts socket. Communication b/w nodes in duplex. Data is sent using Networkstream.Write method and read using Networkstream.read method. For each tcp connection a seperate thread is created.

The problem is, a few days ago we noticed that one of the clients stopped reading data (due to a bug) for 20 minutes. As the connection was not broken, there was no (IO) exception at the server. However, we noticed that data at the other clients was also not going. After 20 minutes, that client again started receiving the data and soon other clients also started receiving the data.

I know that Network stream's write method is a blocking method and we are not using any timeouts. So there is a potential that write has blocked (described here). But as I understood it, there has to be a seperate write buffer for each tcp connection or is there something more at play. Can a send blockage at a tcp connection, effect other tcp connections in the same application?

Here is the pseudo code for write operation. For each connection there is a seperate outgoing queue process by a seperate thread.

public class TCPServerListener : baseConnection
{

    private readonly int _Port;
    private TcpListener _tcpListener;
    private Thread _thread;
    private List<TcpClientData> _tcpClientDataList = new List<TcpClientData>();
    private long _messageDiscardTimeout;
    private bool LoopForClientConnection = true;

    public TCPServerListener(int port, ThreadPriority threadPriority)
    {
        try
        {
            // init property
        }
        catch (Exception ex)
        {
            // log
        }
    }

    public void SendMessageToAll(int type)
    {
        base.EnqueueMessageToSend(type, _tcpClientDataList);
    }
    public void SendMessageToList(int type, IList<TcpClient> tcpClientList)
    {
        base.EnqueueMessageToSend(type, tcpClientList);
    }
    public void SendMessage(int type, TcpClient tcpClient)
    {
        base.EnqueueMessageToSend(type, tcpClient);
    }



    private void AcceptClientConnections()
    {
        while (LoopForClientConnection)
        {
            try
            {
                Socket socket = _tcpListener.AcceptSocket();
                TcpClientData tcpClientData = new TcpClientData();
                tcpClientData.tcpClientThread = new Thread(new ParameterizedThreadStart(StartAsync));
                tcpClientData.tcpClientThread.Priority = _threadPriority;
                tcpClientData.tcpClientThread.IsBackground = true;
                tcpClientData.tcpClientThread.Name = "CD" + tcpClientData.tcpClientThread.ManagedThreadId;
                tcpClientData.tcpClient = new TcpClient();
                tcpClientData.tcpClient.Client = socket;
                _tcpClientDataList.Add(tcpClientData);
                tcpClientData.tcpClientThread.Start(tcpClientData.tcpClient);
            }
            catch (ThreadAbortException ex)
            {
                //log

            }
            catch (Exception ex)
            {
                //log
            }
        }
    }




    public override void Start()
    {
        base.Start();
        _tcpListener = new TcpListener(System.Net.IPAddress.Any, _Port);

        _thread = new Thread(AcceptClientConnections);
        _thread.Priority = _threadPriority;
        _thread.IsBackground = true;

        _tcpListener.Start();
        _thread.Start();
    }

    public override void Stop()
    {
       // stop listener and terminate threads
    }
}


public class baseConnection
{
    private Thread _InCommingThread;
    private Thread _OutGoingThread;
    protected ThreadPriority _threadPriority;
    protected BlockingCollection<MessageReceived> _InComingMessageQueue = new BlockingCollection<MessageReceived>();
    protected BlockingCollection<MessageToSend> _OutgoingMessageQueue = new BlockingCollection<MessageToSend>();

    public void StartAsync(Object oTcpClient)
    {
        TcpClient tcpClient = oTcpClient as TcpClient;
        if (tcpClient == null)
            return;

        using (tcpClient)
        {
            using (NetworkStream stream = tcpClient.GetStream())
            {
                stream.ReadTimeout = Timeout.Infinite;
                stream.WriteTimeout = Timeout.Infinite;

                BinaryReader bodyReader = new BinaryReader(stream);

                while (tcpClient.Connected)
                {
                    try
                    {
                        int messageType = bodyReader.ReadInt32();

                        // checks to verify messages 

                        // enqueue message in incoming queue
                        _InComingMessageQueue.Add(new MessageReceived(messageType, tcpClient));
                    }
                    catch (EndOfStreamException ex)
                    {
                        // log
                        break;
                    }
                    catch (Exception ex)
                    {
                        // log
                        Thread.Sleep(100);
                    }
                }
                //RaiseDisconnected(tcpClient);
            }
        }
    }


    public virtual void Start()
    {
        _InCommingThread = new Thread(HandleInCommingMessnge);
        _InCommingThread.Priority = _threadPriority;
        _InCommingThread.IsBackground = true;
        _InCommingThread.Start();

        _OutGoingThread = new Thread(HandleOutgoingQueue);
        _OutGoingThread.Priority = _threadPriority;
        _OutGoingThread.IsBackground = true;
        _OutGoingThread.Start();
    }


    public virtual void Stop()
    {
       // stop the threads and free up resources
    }

    protected void EnqueueMessageToSend(int type, List<TcpClientData> tcpClientDataList)
    {
        tcpClientDataList.ForEach(x => _OutgoingMessageQueue.Add(new MessageToSend(type, x.tcpClient)));
    }
    protected void EnqueueMessageToSend(int type, IList<TcpClient> tcpClientList)
    {
        foreach (TcpClient tcpClient in tcpClientList)
        {
            _OutgoingMessageQueue.Add(new MessageToSend(type, tcpClient));
        }
    }
    protected void EnqueueMessageToSend(int type, TcpClient tcpClient)
    {
        _OutgoingMessageQueue.Add(new MessageToSend(type, tcpClient));
    }


    private void HandleOutgoingQueue()
    {
        while (true)
        {
            try
            {

                MessageToSend message = _OutgoingMessageQueue.Take();

                if (message.tcpClient.Connected)
                {
                    BinaryWriter writer = new BinaryWriter(message.tcpClient.GetStream());
                    writer.Write(message.type);
                }
            }
            catch (ThreadAbortException ex)
            {
                // log
                return;
            }
            catch (Exception ex)
            {
                //_logger.Error(ex.Message, ex);
            }
        }
    }

    private void HandleInCommingMessnge()
    {
        while (true)
        {
            try
            {
                MessageReceived messageReceived = _InComingMessageQueue.Take();

                // handle message
            }
            catch (ThreadAbortException ex)
            {
                // log
                return;
            }
            catch (Exception ex)
            {
                // log
                //_logger.Error(ex.Message, ex);
            }
        }
    }

    public class MessageReceived
    {
        public MessageReceived(int type, TcpClient tcpClient)
        {
            this.tcpClient = tcpClient;
            this.type = type;
        }

        public int type;
        public TcpClient tcpClient;
    }

    public class MessageToSend
    {
        public MessageToSend(int type, TcpClient tcpClient)
        {
            this.tcpClient = tcpClient;
            this.type = type;
        }

        public int type;
        public TcpClient tcpClient;
    }

    public class TcpClientData
    {
        public Thread tcpClientThread;
        public TcpClient tcpClient;
    }
}

Upvotes: 1

Views: 1939

Answers (1)

C.Evenhuis
C.Evenhuis

Reputation: 26446

You mention that for each connection a separate thread is created, but the code you have shown seems to be able to dequeue a message for any connection.

If this code is running on multiple threads, the program will block as soon as every thread is currently trying to send a message to the blocking connection. Another problem you may face if this loop runs on multiple threads is that messages may not arrive in the correct order for the same connection.

Upvotes: 1

Related Questions