Reputation: 32258
I have the following method within my TcpServer
private void Receive(ZClient client)
{
NetworkStream ns = client.TcpClient.GetStream();
int bytesRead = 0;
byte[] message = new byte[client.TcpClient.ReceiveBufferSize];
while (true)
{
bytesRead = ns.Read(message, 0, client.TcpClient.ReceiveBufferSize);
if (bytesRead == 0)
{
break; // Exit Thread
}
else
{
Array.Resize<Byte>(ref client.Buffer, client.Buffer.Length + bytesRead);
Buffer.BlockCopy(message, 0, client.Buffer, client.Buffer.Length - bytesRead, bytesRead);
// Issue Below
DisassembleRawMessage(client); // *1 See below
new Thread(()=>DeQueueMessages(client)).Start(); // *2 See below
// End Issue
}
}
}
DissassembleRawMessage is responsible for handling the processing of my buffer and enquing individual messages in my client message queue. No problem here.
DeQueueMessages is responsible for processing the objects previously placed into my client message queue within the DissassembleRawMessage method.
The problem may seem pretty obvious. I spin off a new thread after the message is recieved. During the process of dequing and processing the individual messages, the queue may have more items added to it, as more data may have been recieved in the interm. This results in an enumeration error, prompting me that I may not modify the contents of the collection during the enumeration.
I don't want to block my recieve method by processing the queue within the same thread, as it would defeat the purpose allowing the queue to build up requests, while still recieving data. How can I continue to queue objects, and at the same time, process the objects within the queue without running into enumeration modification exceptions?
Update: The method that dequeues the messages.
while (client.Queue.Count() > 0)
{
byte[] buffer = client.Queue.Dequeue();
...
}
Upvotes: 1
Views: 159
Reputation: 54148
Async I/O is often a good solution to the perceived need for extra threads in a network I/O scenario.
If you don't want to block on the Read, use async I/O via NetworkStream.BeginRead - no need for extra threads, unless you want the callback to handoff long-lasting work on received data to a threadpool for execution. You could do this using one of the concurrent collections in .Net 4, ConcurrentQueue or BlockingCollection.
This code as it stands has the potential to result in a tight loop if there is no data available on the NetworkStream. The example here suggests that you have to check DataAvailable
before calling Read in a loop like this. Without that check, this code seems problematic.
Upvotes: 1
Reputation: 27962
Instead of using a custom queue, you can consider using the ThreadPool.QueueUserWorkItem
method for queuing the message to be processed on the thread pool.
Advantages:
Disadvantages:
Upvotes: 0
Reputation: 27864
First, I wouldn't use a new thread for each message received. Either start a thread that waits for new work rather than exiting, or use the ThreadPool.
If you're getting an InvalidOperationException during enumeration, that means you're iterating over a list or other IEnumerable. Does that mean that you're doing something like this?
void DeQueueMessages(ZClient client)
{
foreach(Message m in client.messageList)
{
...
}
messageList.Clear();
}
If so, switch to using the actual Queue class, and do this while loop:
void DeQueueMessages(ZClient client)
{
while(client.messageQueue.Count > 0)
{
Message m = client.messageQueue.Dequeue();
...
}
}
If memory serves, the standard System.Collections.Generic.Queue class isn't thread safe, so I'd go with System.Collections.Concurrent.ConcurrentQueue.
Upvotes: 2
Reputation: 5715
Copy the messages to a second que that is currently being processed and clear the incoming que. You'll probably need to at least block during the copy and clear operation.
Alternatively, dispatch individual message processing to worker threads as they are dequed.
Upvotes: 0