Reputation: 830
Recently I had to build a small TCP client app, that connects to an external app's TCP listener and is intended to deal with large amount of data volumes and on high frequencies.
I made a wrapper class around the TCPClient class, just to catch exceptions, and keep reference to some properties of interest (network stream etc.). Here is the wrapper:
public class MyTCPClient
{
private string serverIP;
private int serverPort;
public TcpClient tcpClient = new TcpClient();
private IPEndPoint serverEndPoint;
private NetworkStream stream = null;
public string name;
public MyTCPClient(string serverIp, int serverPort, string parentName)
{
this.serverIP = serverIp;
this.serverPort = serverPort;
this.name = parentName + "_TCPClient";
serverEndPoint = new IPEndPoint(IPAddress.Parse(serverIP), serverPort);
tcpClient.ReceiveBufferSize = 1048576;
this.TryConnect();
}
private bool TryConnect()
{
try
{
tcpClient.Connect(serverEndPoint);
}
catch (SocketException e1)
{
throw new ErrorOnConnectingException(e1, "SocketException while connecting. (see msdn Remarks section for more details. ) Error code: " + e1.ErrorCode);
}
catch (ArgumentNullException e2)
{
throw new ErrorOnConnectingException(e2, "ArgumentNullException while connecting. (The hostname parameter is null.) Message: " + e2.Message);
}
catch (ArgumentOutOfRangeException e3)
{
throw new ErrorOnConnectingException(e3, "ArgumentOutOfRangeException while connecting (The port parameter is not between MinPort and MaxPort. ). Message: " + e3.Message);
}
catch (ObjectDisposedException e4)
{
throw new ErrorOnConnectingException(e4, "ObjectDisposedException while connecting. (TcpClient is closed. ) Message: " + e4.Message);
}
try
{
stream = this.tcpClient.GetStream();
}
catch (ObjectDisposedException e1)
{
throw new ErrorOnGettingStreamException(e1, "ObjectDisposedException while acquiring Network stream. (The TcpClient has been closed. ) Message: " + e1.Message);
}
catch (InvalidOperationException e2)
{
throw new ErrorOnGettingStreamException(e2, "ArgumentOutOfRangeException while acquiring Network stream (The TcpClient is not connected to a remote host. ). Message: " + e2.Message);
}
return true;
}
public string ReadData()
{
try
{
ASCIIEncoding encoder = new ASCIIEncoding();
byte[] dataHeader = new byte[12];
if (this.tcpClient.Connected)
{
stream.Read(dataHeader, 0, 12);
}
else
{
throw new ErrorOnReadingException(null, "The underlying TCP tcpClient is not connected any more");
}
var strHeaderMessage = System.Text.Encoding.Default.GetString(dataHeader);
Utils.logToTimeStampedFile(strHeaderMessage, name);
int bodyAndTailCount = Convert.ToInt32(strHeaderMessage.Replace("#", ""));
byte[] dataBodyAndTail = new byte[bodyAndTailCount];
if (this.tcpClient.Connected)
{
stream.Read(dataBodyAndTail, 0, bodyAndTailCount);
}
else
{
throw new ErrorOnReadingException(null, "The underlying TCP tcpClient is not connected any more");
}
var strBodyAndTailMessage = System.Text.Encoding.Default.GetString(dataBodyAndTail);
Utils.logToTimeStampedFile(strBodyAndTailMessage, name);
return strBodyAndTailMessage;
}
catch (FormatException e0)
{
CloseAllLeft();
throw new ErrorOnReadingException(e0, "FormatException while reading data. (Bytes red are null or does not correspond to specification, happens on closing Server) Message: " + e0.Message);
}
catch (ArgumentNullException e1)
{
CloseAllLeft();
throw new ErrorOnReadingException(e1, "ArgumentNullException while reading data. (The buffer parameter is null.) Message: " + e1.Message);
}
catch (ArgumentOutOfRangeException e2)
{
CloseAllLeft();
throw new ErrorOnReadingException(e2, "ArgumentOutOfRangeException while reading data. (see msdn description) Message: " + e2.Message);
}
catch (IOException e3)
{
CloseAllLeft();
throw new ErrorOnReadingException(e3, "IOException while reading data. (The underlying Socket is closed.) Message: " + e3.Message);
}
catch (ObjectDisposedException e4)
{
CloseAllLeft();
throw new ErrorOnReadingException(e4, "ArgumentOutOfRangeException while reading data. (see msdn description) Message: " + e4.Message);
}
}
public void CloseAllLeft()
{
try
{
stream.Close();
}
catch (Exception e)
{
Console.WriteLine("Exception closing tcp network stream: " + e.Message);
}
try
{
tcpClient.Close();
}
catch (Exception e)
{
Console.WriteLine("Exception closing tcpClient: " + e.Message);
}
}
}
Still, nothing mentioned about the threads using this MyTCPClient. The app should have two such TCP clients, connecting on different ports, and doing different jobs. I was new to TCP programming and after some wandering around the properties I decided to use the blocking read approach - i.e by default the TCPClient.Read() method will block the thread until there is new data. I needed such approach bucause I do not have control on the external app's listener, and the only way to recognize a server's closing was the "zero bytes" sent as per TCP Sockets specs.
So, i build an abstract class that will maintain and control threads that will later make use of the above MyTCPClient class (which by design, eventually might block the parent theads). Here is the code for my abstract TCPManager:
/// <summary>
/// Serves as a dispatcher for the high frequency readings from the TCP pipe.
/// Each time the thread is started it initializes new TCPClients which will attempt to connect to server.
/// Once established a TCP socket connection is alive until the thread is not requested to stop.
///
/// Error hanling level here:
///
/// Resources lke NetworkStream and TCPClients are ensured to be closed already within the myTCPClient class, and the error handling here
/// is steps on top of that - sending proper emails, notifications and logging.
///
/// </summary>
public abstract class AbstractmyTCPClientManager
{
public string name;
public string serverIP;
public int serverPort;
public Boolean requestStop = false;
public Boolean MyTCPClientThreadRunning = false;
public Boolean requestStart = false;
public myTCPClient myTCPClient;
public int sleepInterval;
public Thread MyTCPClientThread;
public AbstractmyTCPClientManager(string name, string serverIP, int serverPort)
{
this.name = name;
this.serverIP = serverIP;
this.serverPort = serverPort;
}
public void ThreadRun()
{
MyTCPClientThreadRunning = false;
bool TCPSocketConnected = false;
bool AdditionalInitializationOK = false;
// keep trying to init requested tcp clients
while (!MyTCPClientThreadRunning && !requestStop) // and we are not suggested to stop
{
while (!TCPSocketConnected && !requestStop) // and we are not suggested to stop)
{
try
{
myTCPClient = new myTCPClient(serverIP, serverPort, name);
TCPSocketConnected = true;
}
catch (ErrorOnConnectingException e0)
{
// nah, too long message
string detail = e0.originalException != null ? e0.originalException.Message : "No inner exception";
//Utils.logToTimeStampedFile("Creating connection attempt failed.(1." + e0.customMessage + " 2." + detail + "). Will retry in 10 seconds...", name);
//Utils.logToTimeStampedFile(e0.customMessage + " (" + detail + "). Will retry in 10 seconds...", name);
Utils.logToTimeStampedFile(detail + ". Will retry in 10 seconds...", name);
Thread.Sleep(10000);
}
catch (ErrorOnGettingStreamException e1)
{
// nah, too long message
string detail = e1.originalException != null ? e1.originalException.Message : "No inner exception";
//Utils.logToTimeStampedFile("Getting network stream attempt failed. (1." + e1.customMessage + " 2." + detail + "). Will retry in 10 seconds...", name);
//Utils.logToTimeStampedFile(e1.customMessage + " (" + detail + "). Will retry in 10 seconds...", name);
Utils.logToTimeStampedFile(detail + ". Will retry in 10 seconds...", name);
Thread.Sleep(10000);
}
}
Utils.logToTimeStampedFile("TCP Communication established", name);
while (!AdditionalInitializationOK && !requestStop) // or we are not suggested to stop
{
try
{
AdditionalInitialization();
AdditionalInitializationOK = true;
}
catch (AdditionalInitializationException e1)
{
string detail = e1.originalException != null ? e1.originalException.Message : "No inner exception";
//Utils.logToTimeStampedFile("Additional initialization failed (1." + e1.customMessage + " 2." + detail + "). Will retry in 10 seconds", name);
Utils.logToTimeStampedFile(e1.customMessage + ". Will retry in 10 seconds", name);
Thread.Sleep(10000);
}
}
MyTCPClientThreadRunning = TCPSocketConnected && AdditionalInitializationOK;
ViewModelLocator.ControlTabStatic.updateUIButtons();
}
Utils.logToTimeStampedFile("Additional Initialization successfully completed, thread started", name);
// while all normal (i.e nobody request a stop) continiously sync with server (read data)
while (!requestStop)
{
try
{
syncWithInterface();
}
catch (ErrorOnReadingException e1)
{
string detail = e1.originalException != null ? e1.originalException.Message : "No inner exception";
//Utils.logToTimeStampedFile("Error ocured while reading data. (1." + e1.customMessage + " 2." + detail + ")", name);
Utils.logToTimeStampedFile(e1.customMessage, name);
if (!requestStop) // i.e if this indeed is an exception, during a normal flow, and nobody requested a thread stop (which migh cause read exceptions as a consequence)
{
Utils.logToTimeStampedFile("There was no external stop request, when the error occured, doing tcp client restart.", name);
requestStop = true;
requestStart = true;
}
}
Thread.Sleep(sleepInterval);
}
// we need to close all after execution, but the execution may be closed before/while resources were still initializing
if (TCPSocketConnected)
{
myTCPClient.CloseAllLeft();
}
if (AdditionalInitializationOK)
{
ReleaseAdditionalResources();
}
// remember that thread is stoped
MyTCPClientThreadRunning = false;
Utils.logToTimeStampedFile("Thread stoped", name);
ViewModelLocator.ControlTabStatic.updateUIButtons();
// this serves as a restart
if (requestStart)
{
Utils.logToTimeStampedFile("Restarting thread...", name);
this.requestStop = false;
this.requestStart = false; // we are already processing a request start event, so reset this flag
this.MyTCPClientThread = new Thread(new ThreadStart(this.ThreadRun));
this.MyTCPClientThread.Name = this.name;
this.MyTCPClientThread.IsBackground = true;
this.MyTCPClientThread.Start();
}
}
/// <summary>
/// this method empties the entire TCP buffer, cycling through it
/// </summary>
private void syncWithInterface()
{
int counter = 0;
// read at most 100 messages at once (we assume that for 3 sec interval there might not be more,
//even if they are, it is still OK, they just will be processed next time)
while (counter < 100)
{
counter++;
string data = myTCPClient.ReadData();
ForwardData(data);
}
// below is left for testing:
/*
* "Sleep(0) or Yield is occasionally useful in production code for
* advanced performance tweaks. It’s also an excellent diagnostic tool
* for helping to uncover thread safety issues: if inserting Thread.Yield()
* anywhere in your code makes or breaks the program, you almost certainly have a bug."*/
Thread.Yield();
}
/// <summary>
/// Left for implementing in the caller that initialized the object. Meaning: one and the same way for receiving market/order data. Different ways of processing this data
/// </summary>
/// <param name="data"></param>
public abstract void ForwardData(string data);
/// <summary>
/// left for implementing in child classes. Its purpose is to initialize any additional resources needed for the thread to operate.
/// If something goes wrong while getting this additional resources,
/// an AdditionalInitialization exception should be thrown, which is than handled from the initialization phase in the caller.
/// </summary>
public abstract void AdditionalInitialization();
// countrapart of AdditionalInitialization method - what is initialized should be then closed
public abstract void ReleaseAdditionalResources();
}
Later, each needed TCP communication channel would have a dedicated implementation for the above abstract class, providing implementation of the methods ForwardData (i.e what to do with this data) and AdditionalInitialization (i.e what else is needed to be initialized before a particular TCP communication processing is run. For example on of my threads required additional storage Thread to be initialized prior receiving data).
Everything was fine, except for closing the TCP processing. I had this requestStop variables to control whther a thread should exit or continue, but the thing is that Read() method may fall in continious blocking, preventing even the requestStop variable from being read (I should say that the two tcp channels I need to process are very different in that one of them is very frequently receiving data and the other one - sporadically). I would like still to have them implementing the same design. So from what I am reading so far I have to implement another, "parent", or "controlling", or "wrapper" thread that will actually take the job on observing requestStop parameter.
I am looking towards solutions like this post, or timers like this post
Any suggestions would be greatly appreciated. Thanks!
Upvotes: 3
Views: 6709
Reputation: 24847
Set your 'requestStop' bool and Close the client socket from another thread. This causes the read() call to return 'early' with an error/exception. The client thread can check 'requestStop' after every read() return and clean up/exit if requested.
TBH, I rarely bother with explicitly shutting down such clients anyway. I just leave them until the app exits.
Upvotes: 0
Reputation: 41757
I would recommend calling the ReadAsync method of NetworkStream and passing a CancellationToken to it. This way, the read operation can be easily cancelled (from another thread) when a request stop event is observed:
public class MyTCPClient : IDisposable
{
...
private CancellationTokenSource cancellationTokenSource = new CancellationTokenSource ();
...
public string ReadData()
{
...
byte[] dataHeader = new byte[12];
if (this.tcpClient.Connected)
{
stream.ReadAsync(dataHeader, 0, 12, cancellationTokenSource.Token).Wait();
} ...
Upvotes: 2
Reputation: 2517
I would personally use asynchronous sockets for this: http://msdn.microsoft.com/en-us/library/bbx2eya8.aspx
If you, however, still want to use blocking reads, you one could simply be to Close() the socket from another thread.
I hope this help.
Upvotes: 3