Reputation:
Related to my other question except now I try async hoping it would fix the issues. It doesn't.
I'm trying to create a simple SOCKS5 server. I set my browser (firefox) to use this program as a SOCKS5. The idea is a program connects to the proxy server, give it information required and the server just simply reads/writes data from one connection to the other. This one simply does that and doesn't log nor filter anything. It is dead simple but because of the CPU issue and the fact it takes several seconds to connect to a site after you hit a few pages makes it completely unusable. How on earth is this eating up so much CPU? And why does it take a long time to connect to a site? Both async and sync suffer from this
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Net.Sockets;
using System.Timers;
using System.IO;
using System.Net;
using System.Threading;
namespace ProxyTest
{
class Program
{
static ManualResetEvent tcpClientConnected =new ManualResetEvent(false);
static void Main(string[] args)
{
var s2 = new TcpListener(9998);
s2.Start();
Task.Run(() =>
{
while (true)
{
tcpClientConnected.Reset();
s2.BeginAcceptTcpClient(Blah, s2);
tcpClientConnected.WaitOne();
}
});
while (true)
System.Threading.Thread.Sleep(10000000);
}
static void Blah(IAsyncResult ar)
{
try
{
Console.WriteLine("Connection");
TcpListener listener = (TcpListener)ar.AsyncState;
using (var socketin = listener.EndAcceptTcpClient(ar))
{
tcpClientConnected.Set();
var ns1 = socketin.GetStream();
var r1 = new BinaryReader(ns1);
var w1 = new BinaryWriter(ns1);
if (false)
{
var s3 = new TcpClient();
s3.Connect("127.0.0.1", 9150);
var ns3 = s3.GetStream();
var r3 = new BinaryReader(ns3);
var w3 = new BinaryWriter(ns3);
while (true)
{
while (ns1.DataAvailable)
{
var b = ns1.ReadByte();
w3.Write((byte)b);
//Console.WriteLine("1: {0}", b);
}
while (ns3.DataAvailable)
{
var b = ns3.ReadByte();
w1.Write((byte)b);
Console.WriteLine("2: {0}", b);
}
}
}
{
if (!(r1.ReadByte() == 5 && r1.ReadByte() == 1))
return;
var c = r1.ReadByte();
for (int i = 0; i < c; ++i)
r1.ReadByte();
w1.Write((byte)5);
w1.Write((byte)0);
}
{
if (!(r1.ReadByte() == 5 && r1.ReadByte() == 1))
return;
if (r1.ReadByte() != 0)
return;
}
byte[] ipAddr = null;
string hostname = null;
var type = r1.ReadByte();
switch (type)
{
case 1:
ipAddr = r1.ReadBytes(4);
break;
case 3:
hostname = Encoding.ASCII.GetString(r1.ReadBytes(r1.ReadByte()));
break;
case 4:
throw new Exception();
}
var nhport = r1.ReadInt16();
var port = IPAddress.NetworkToHostOrder(nhport);
var socketout = new TcpClient();
if (hostname != null)
socketout.Connect(hostname, port);
else
socketout.Connect(new IPAddress(ipAddr), port);
w1.Write((byte)5);
w1.Write((byte)0);
w1.Write((byte)0);
w1.Write(type);
switch (type)
{
case 1:
w1.Write(ipAddr);
break;
case 2:
w1.Write((byte)hostname.Length);
w1.Write(Encoding.ASCII.GetBytes(hostname), 0, hostname.Length);
break;
}
w1.Write(nhport);
var buf1 = new byte[4096];
var buf2 = new byte[4096];
var ns2 = socketout.GetStream();
var r2 = new BinaryReader(ns2);
var w2 = new BinaryWriter(ns2);
Task.Run(() =>
{
var re = new ManualResetEvent(false);
while (true)
{
re.Reset();
ns1.BeginRead(buf1, 0, buf1.Length, ReadCallback, new A() { buf = buf1, thisSocket = socketin, otherSocket = socketout, thisStream = ns1, otherStream = ns2, re=re });
re.WaitOne();
}
});
Task.Run(() =>
{
var re = new ManualResetEvent(false);
while (true)
{
re.Reset();
ns2.BeginRead(buf2, 0, buf2.Length, ReadCallback, new A() { buf = buf2, thisSocket = socketout, otherSocket = socketin, thisStream = ns2, otherStream = ns1, re = re });
re.WaitOne();
}
});
while (true)
{
if (socketin.Connected == false)
return;
Thread.Sleep(100);
}
}
}
catch { }
}
class A { public byte[] buf; public TcpClient thisSocket, otherSocket; public NetworkStream thisStream, otherStream; public ManualResetEvent re;};
static void ReadCallback(IAsyncResult ar)
{
try
{
var a = (A)ar.AsyncState;
var ns1 = a.thisStream;
var len = ns1.EndRead(ar);
a.otherStream.Write(a.buf, 0, len);
a.re.Set();
}
catch
{
}
}
}
}
Upvotes: 4
Views: 4918
Reputation: 12330
There are a few things going on here!
The async calls are all called synchronous style. As in, the thread that starts the operation calls a WaitOne - this basically just makes it equivalent to a synchonous call, no different.
Sleep loops are bad. A sleep(1) loop will respond quickly but use some CPU, a sleep(1000) loop will respond slowly but use less CPU. Having a dozen threads in a sleep loop doesn't use much CPU, but if the number of threads keeps increasing, CPU usage will become significant. The best way is to use async calls instead of polling.
Lots of tasks running loops. Without guaranteed exit paths these cause the thread count to skyrocket.
If you are forwarding data from socket A to socket B, you need to act when either socket is closed: cease forwarding, ensure that pending writes complete and close the sockets.
The current implementation doesn't properly ensure both forwarding tasks are closed if one closes, and the technique of starting a task then blocking on a manual reset event can fail if the task gets an exception prior to setting the event. Both cases leave a task running ad infinitum.
Checking Socket.Connected
seems like an obvious thing to do but in practice this is just a cache of the whether the last IO operation encoundered a disconnect.
I prefer to act on "zero recv"s which are your first notification of a disconnect.
I knocked up a quick async version of your original synchronous routine using PowerThreading via NuGet (this is a way of doing async routines prior to framework 4.5).
This works using TcpListener
with zero cpu usage and very low number of threads.
This can be done in vanilla c# using async/await... I just don't know how yet :)
using System;
using System.Collections.Generic;
using System.Text;
namespace AeProxy
{
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Threading;
// Need to install Wintellect.Threading via NuGet for this:
using Wintellect.Threading.AsyncProgModel;
class Program
{
static void Main(string[] args)
{
var ae = new AsyncEnumerator() {SyncContext = null};
var mainOp = ae.BeginExecute(ListenerFiber(ae), null, null);
// block until main server is finished
ae.EndExecute(mainOp);
}
static IEnumerator<int> ListenerFiber(AsyncEnumerator ae)
{
var listeningServer = new TcpListener(IPAddress.Loopback, 9998);
listeningServer.Start();
while (!ae.IsCanceled())
{
listeningServer.BeginAcceptTcpClient(ae.End(0, listeningServer.EndAcceptTcpClient), null);
yield return 1;
if (ae.IsCanceled()) yield break;
var clientSocket = listeningServer.EndAcceptTcpClient(ae.DequeueAsyncResult());
var clientAe = new AsyncEnumerator() { SyncContext = null };
clientAe.BeginExecute(
ClientFiber(clientAe, clientSocket),
ar =>
{
try
{
clientAe.EndExecute(ar);
}
catch { }
}, null);
}
}
static long clients = 0;
static IEnumerator<int> ClientFiber(AsyncEnumerator ae, TcpClient clientSocket)
{
Console.WriteLine("ClientFibers ++{0}", Interlocked.Increment(ref clients));
try
{
// original code to do handshaking and connect to remote host
var ns1 = clientSocket.GetStream();
var r1 = new BinaryReader(ns1);
var w1 = new BinaryWriter(ns1);
if (!(r1.ReadByte() == 5 && r1.ReadByte() == 1)) yield break;
var c = r1.ReadByte();
for (int i = 0; i < c; ++i) r1.ReadByte();
w1.Write((byte)5);
w1.Write((byte)0);
if (!(r1.ReadByte() == 5 && r1.ReadByte() == 1)) yield break;
if (r1.ReadByte() != 0) yield break;
byte[] ipAddr = null;
string hostname = null;
var type = r1.ReadByte();
switch (type)
{
case 1:
ipAddr = r1.ReadBytes(4);
break;
case 3:
hostname = Encoding.ASCII.GetString(r1.ReadBytes(r1.ReadByte()));
break;
case 4:
throw new Exception();
}
var nhport = r1.ReadInt16();
var port = IPAddress.NetworkToHostOrder(nhport);
var socketout = new TcpClient();
if (hostname != null) socketout.Connect(hostname, port);
else socketout.Connect(new IPAddress(ipAddr), port);
w1.Write((byte)5);
w1.Write((byte)0);
w1.Write((byte)0);
w1.Write(type);
switch (type)
{
case 1:
w1.Write(ipAddr);
break;
case 3:
w1.Write((byte)hostname.Length);
w1.Write(Encoding.ASCII.GetBytes(hostname), 0, hostname.Length);
break;
}
w1.Write(nhport);
using (var ns2 = socketout.GetStream())
{
var forwardAe = new AsyncEnumerator() { SyncContext = null };
forwardAe.BeginExecute(
ForwardingFiber(forwardAe, ns1, ns2), ae.EndVoid(0, forwardAe.EndExecute), null);
yield return 1;
if (ae.IsCanceled()) yield break;
forwardAe.EndExecute(ae.DequeueAsyncResult());
}
}
finally
{
Console.WriteLine("ClientFibers --{0}", Interlocked.Decrement(ref clients));
}
}
private enum Operation { OutboundWrite, OutboundRead, InboundRead, InboundWrite }
const int bufsize = 4096;
static IEnumerator<int> ForwardingFiber(AsyncEnumerator ae, NetworkStream inputStream, NetworkStream outputStream)
{
while (!ae.IsCanceled())
{
byte[] outputRead = new byte[bufsize], outputWrite = new byte[bufsize];
byte[] inputRead = new byte[bufsize], inputWrite = new byte[bufsize];
// start off output and input reads.
// NB ObjectDisposedExceptions can be raised here when a socket is closed while an async read is in progress.
outputStream.BeginRead(outputRead, 0, bufsize, ae.End(1, ar => outputStream.EndRead(ar)), Operation.OutboundRead);
inputStream.BeginRead(inputRead, 0, bufsize, ae.End(1, ar => inputStream.EndRead(ar)), Operation.InboundRead);
var pendingops = 2;
while (!ae.IsCanceled())
{
// wait for the next operation to complete, the state object passed to each async
// call can be used to find out what completed.
if (pendingops == 0) yield break;
yield return 1;
if (!ae.IsCanceled())
{
int byteCount;
var latestEvent = ae.DequeueAsyncResult();
var currentOp = (Operation)latestEvent.AsyncState;
if (currentOp == Operation.InboundRead)
{
byteCount = inputStream.EndRead(latestEvent);
if (byteCount == 0)
{
pendingops--;
outputStream.Close();
continue;
}
Array.Copy(inputRead, outputWrite, byteCount);
outputStream.BeginWrite(outputWrite, 0, byteCount, ae.EndVoid(1, outputStream.EndWrite), Operation.OutboundWrite);
inputStream.BeginRead(inputRead, 0, bufsize, ae.End(1, ar => inputStream.EndRead(ar)), Operation.InboundRead);
}
else if (currentOp == Operation.OutboundRead)
{
byteCount = outputStream.EndRead(latestEvent);
if (byteCount == 0)
{
pendingops--;
inputStream.Close();
continue;
}
Array.Copy(outputRead, inputWrite, byteCount);
inputStream.BeginWrite(inputWrite, 0, byteCount, ae.EndVoid(1, inputStream.EndWrite), Operation.InboundWrite);
outputStream.BeginRead(outputRead, 0, bufsize, ae.End(1, ar => outputStream.EndRead(ar)), Operation.OutboundRead);
}
else if (currentOp == Operation.InboundWrite)
{
inputStream.EndWrite(latestEvent);
}
else if (currentOp == Operation.OutboundWrite)
{
outputStream.EndWrite(latestEvent);
}
}
}
}
}
}
}
Upvotes: 5
Reputation: 14417
You should use async versions of TcpClient methods instead of spawning threads.
Upvotes: 0
Reputation: 442
Caveat: I had to adjust things slightly since I'm not using 4.5.
Task.Run() --> new Thread().Start()
You are using far too many threads. Simply attempting to load this question in stackoverflow caused 30+ threads to spawn, which reproduces the behavior seen using Task.Run().
With your code cut down to a single thread per connection, my CPU usage is hovering around 0%. Everything loads quickly.
using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.ComponentModel;
using System.Data;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Net.Sockets;
using System.Timers;
using System.IO;
using System.Net;
using System.Threading;
namespace SOCKS5
{
static class Program
{
static void Main()
{
var s2 = new TcpListener(9998);
s2.Start();
while (true)
{
if (s2.Pending())
{
Thread test = new Thread(() =>
{
using (TcpClient client = s2.AcceptTcpClient())
{
Blah(client);
}
});
test.Start();
}
Thread.Sleep(10);
}
}
static void Blah(TcpClient listener)
{
try
{
Console.WriteLine("Connection");
//TcpListener listener = (TcpListener)ar.AsyncState;
//tcpClientConnected.Set();
var ns1 = listener.GetStream();
var r1 = new BinaryReader(ns1);
var w1 = new BinaryWriter(ns1);
if (false)
{
var s3 = new TcpClient();
s3.Connect("127.0.0.1", 9150);
var ns3 = s3.GetStream();
var r3 = new BinaryReader(ns3);
var w3 = new BinaryWriter(ns3);
while (true)
{
while (ns1.DataAvailable)
{
var b = ns1.ReadByte();
w3.Write((byte)b);
//Console.WriteLine("1: {0}", b);
}
while (ns3.DataAvailable)
{
var b = ns3.ReadByte();
w1.Write((byte)b);
Console.WriteLine("2: {0}", b);
}
}
}
{
if (!(r1.ReadByte() == 5 && r1.ReadByte() == 1))
return;
var c = r1.ReadByte();
for (int i = 0; i < c; ++i)
r1.ReadByte();
w1.Write((byte)5);
w1.Write((byte)0);
}
{
if (!(r1.ReadByte() == 5 && r1.ReadByte() == 1))
return;
if (r1.ReadByte() != 0)
return;
}
byte[] ipAddr = null;
string hostname = null;
var type = r1.ReadByte();
switch (type)
{
case 1:
ipAddr = r1.ReadBytes(4);
break;
case 3:
hostname = Encoding.ASCII.GetString(r1.ReadBytes(r1.ReadByte()));
break;
case 4:
throw new Exception();
}
var nhport = r1.ReadInt16();
var port = IPAddress.NetworkToHostOrder(nhport);
var socketout = new TcpClient();
if (hostname != null)
socketout.Connect(hostname, port);
else
socketout.Connect(new IPAddress(ipAddr), port);
w1.Write((byte)5);
w1.Write((byte)0);
w1.Write((byte)0);
w1.Write(type);
switch (type)
{
case 1:
w1.Write(ipAddr);
break;
case 2:
w1.Write((byte)hostname.Length);
w1.Write(Encoding.ASCII.GetBytes(hostname), 0, hostname.Length);
break;
}
w1.Write(nhport);
var buf1 = new byte[4096];
var buf2 = new byte[4096];
var ns2 = socketout.GetStream();
DateTime last = DateTime.Now;
while ((DateTime.Now - last).TotalMinutes < 5.0)
{
if (ns1.DataAvailable)
{
int size = ns1.Read(buf1, 0, buf1.Length);
ns2.Write(buf1, 0, size);
last = DateTime.Now;
}
if (ns2.DataAvailable)
{
int size = ns2.Read(buf2, 0, buf2.Length);
ns1.Write(buf2, 0, size);
last = DateTime.Now;
}
Thread.Sleep(10);
}
}
catch { }
finally
{
try
{
listener.Close();
}
catch (Exception) { }
}
}
}
}
Edit:
This ended up being kinda fun to mess with.
After routing Firefox traffic through this for a few hours, some observations.
Never noticed a regular pattern to determine when to close connections. Letting threads terminate after they've been idle for 5 minutes (no rx/tx) keeps the thread count fairly low. It's a pretty safe bound that allows services such as gmail chat to keep functioning.
For some reason, the program would occasionally not receive requests from the browser, which would report a timeout. No notification of a missed request in the program, nothing. Only noticed when browsing stackoverflow. Still haven't figured that one out.
Upvotes: 6
Reputation: 936
You should take a look at Overlapped I/O. One Thread per connection maybe works fine but in general it's bad.
Upvotes: 0
Reputation: 1387
In this line...
while (true)
System.Threading.Thread.Sleep(10000000);
Will not be better to replace it by a simple:
Console.ReadKey();
is the only CPU consumption thing I see.
Also, as a suggestion, you should limit the number of incoming connections and use a Thread pool pattern (in a queue or something).
Upvotes: 0