Reputation: 2329
I have the following requirements for a server/client architecture:
Write a server/client that works asynchronously.
The communication needs to be a duplex, i.e., reads and writes on both ends.
Multiple clients can connect to the server at any given time.
Server/client should wait until they become available and finally make a connection.
Once a client connects it should write to the stream.
Then the server should read from the stream and write response back to the client.
Finally, the client should read the response and the communication should end.
So with the following requirements in mind I've written the following code but I'm not too sure about it because the docs for pipes are somewhat lacking, unfortunately and the code doesn't seems to work correctly, it hangs at a certain point.
namespace PipesAsyncAwait471
{
using System;
using System.Collections.Generic;
using System.IO.Pipes;
using System.Linq;
using System.Threading.Tasks;
internal class Program
{
private static async Task Main()
{
List<Task> tasks = new List<Task> {
HandleRequestAsync(),
};
tasks.AddRange(Enumerable.Range(0, 10).Select(i => SendRequestAsync(i, 0, 5)));
await Task.WhenAll(tasks);
}
private static async Task HandleRequestAsync()
{
using (NamedPipeServerStream server = new NamedPipeServerStream("MyPipe",
PipeDirection.InOut,
NamedPipeServerStream.MaxAllowedServerInstances,
PipeTransmissionMode.Message,
PipeOptions.Asynchronous))
{
Console.WriteLine("Waiting...");
await server.WaitForConnectionAsync().ConfigureAwait(false);
if (server.IsConnected)
{
Console.WriteLine("Connected");
if (server.CanRead) {
// Read something...
}
if (server.CanWrite) {
// Write something...
await server.FlushAsync().ConfigureAwait(false);
server.WaitForPipeDrain();
}
server.Disconnect();
await HandleRequestAsync().ConfigureAwait(false);
}
}
}
private static async Task SendRequestAsync(int index, int counter, int max)
{
using (NamedPipeClientStream client = new NamedPipeClientStream(".", "MyPipe", PipeDirection.InOut, PipeOptions.Asynchronous))
{
await client.ConnectAsync().ConfigureAwait(false);
if (client.IsConnected)
{
Console.WriteLine($"Index: {index} Counter: {counter}");
if (client.CanWrite) {
// Write something...
await client.FlushAsync().ConfigureAwait(false);
client.WaitForPipeDrain();
}
if (client.CanRead) {
// Read something...
}
}
if (counter <= max) {
await SendRequestAsync(index, ++counter, max).ConfigureAwait(false);
}
else {
Console.WriteLine($"{index} Done!");
}
}
}
}
}
Assumptions:
The way I expect it to work is for all the requests I make when I call SendRequestAsync
to execute concurrently where each request then makes additional requests until it reaches 6
and finally, it should print "Done!".
Remarks:
I've tested it on .NET Framework 4.7.1 and .NET Core 2.0 and I get the same results.
The communication between clients and the server is always local to the machine where clients are web applications that can queue some jobs like launching 3rd-party processes and the server is going to be deployed as a Windows service on the same machine as the web server that these clients are deployed on.
Upvotes: 7
Views: 11502
Reputation: 2329
Here is the complete code after some iterations:
PipeServer.cs:
namespace AsyncPipes;
using System.Diagnostics.CodeAnalysis;
using System.IO.Pipes;
public static class PipeServer
{
public static void WaitForConnection()
=> WaitForConnectionInitializer();
private static void WaitForConnectionInitializer()
{
var context = new ServerContext();
var server = context.Server;
try
{
Console.WriteLine($"Waiting a client...");
server.BeginWaitForConnection(WaitForConnectionCallback, context);
}
catch
{
// We need to cleanup here only when something goes wrong.
context.Dispose();
throw;
}
static void WaitForConnectionCallback(IAsyncResult result)
{
var (context, server, _) = ServerContext.FromResult(result);
server.EndWaitForConnection(result);
WaitForConnectionInitializer();
BeginRead(context);
}
static void BeginRead(ServerContext context)
{
var (_, server, requestBuffer) = context;
server.BeginRead(requestBuffer, 0, requestBuffer.Length, ReadCallback, context);
}
static void BeginWrite(ServerContext context)
{
var (_, server, responseBuffer) = context;
server.BeginWrite(responseBuffer, 0, responseBuffer.Length, WriteCallback, context);
}
static void ReadCallback(IAsyncResult result)
{
var (context, server, requestBuffer) = ServerContext.FromResult(result);
var bytesRead = server.EndRead(result);
if (bytesRead > 0)
{
if (!server.IsMessageComplete)
{
BeginRead(context);
}
else
{
var index = BitConverter.ToInt32(requestBuffer, 0);
Console.WriteLine($"{index} Request.");
BeginWrite(context);
}
}
}
static void WriteCallback(IAsyncResult result)
{
var (context, server, responseBuffer) = ServerContext.FromResult(result);
var index = -1;
try
{
server.EndWrite(result);
server.WaitForPipeDrain();
index = BitConverter.ToInt32(responseBuffer, 0);
Console.WriteLine($"{index} Pong.");
}
finally
{
context.Dispose();
Console.WriteLine($"{index} Disposed.");
}
}
}
private sealed class ServerContext : IDisposable
{
[NotNull]
public byte[]? Buffer { get; private set; } = new byte[4];
[NotNull]
public NamedPipeServerStream? Server { get; private set; } = new ("PipesDemo",
PipeDirection.InOut,
NamedPipeServerStream.MaxAllowedServerInstances,
PipeTransmissionMode.Message,
PipeOptions.Asynchronous);
public void Deconstruct(out ServerContext context, out NamedPipeServerStream server, out byte[] buffer)
=> (context, server, buffer) = (this, Server, Buffer);
public static ServerContext FromResult(IAsyncResult result)
{
ArgumentNullException.ThrowIfNull(result.AsyncState);
return (ServerContext)result.AsyncState;
}
public void Dispose()
{
if (Server is not null)
{
if (Server.IsConnected)
{
Server.Disconnect();
}
Server.Dispose();
}
Server = null;
Buffer = null;
}
}
}
PipeClient:
public static class PipeClient
{
public static void CreateConnection(int index)
{
using var client = new NamedPipeClientStream(".", "PipesDemo", PipeDirection.InOut, PipeOptions.None);
client.Connect();
var requestBuffer = BitConverter.GetBytes(index);
client.Write(requestBuffer, 0, requestBuffer.Length);
client.Flush();
client.WaitForPipeDrain();
Console.WriteLine($"{index} Ping.");
var responseBuffer = new byte[4];
var bytesRead = client.Read(responseBuffer, 0, responseBuffer.Length);
while (bytesRead > 0)
{
bytesRead = client.Read(responseBuffer, bytesRead - 1, responseBuffer.Length - bytesRead);
}
index = BitConverter.ToInt32(responseBuffer, 0);
Console.WriteLine($"{index} Response.");
}
}
Program.cs:
namespace AsyncPipes;
internal class Program
{
private const int MaxRequests = 1000;
private static void Main()
{
var tasks = new List<Task>
{
Task.Run(PipeServer.WaitForConnection)
};
tasks.AddRange(Enumerable.Range(0, MaxRequests - 1)
.Select(i => Task.Factory.StartNew(() => PipeClient.CreateConnection(i),
TaskCreationOptions.LongRunning)));
Task.WaitAll(tasks.ToArray());
Console.ReadKey();
}
}
You can sort the messages and observe the following:
Connections are opened and closed correctly.
Data is sent and received correctly.
Finally, the server still waits for further connections.
Updates:
Changed PipeOptions.Asynchronous
to PipeOptions.None
otherwise it seems like it hangs for the duration of the requests and only then processing them at once.
PipeOptions.Asynchronous is simply causing a different order of execution than PipeOptions.None, and that's exposing a race condition / deadlock in your code. You can see the effect of it if you use Task Manager, for example, to monitor the thread count of your process... you should see it creeping up at a rate of appx 1 thread per second, until it gets to around 100 threads (maybe 110 or so), at which point your code runs to completion. Or if you add ThreadPool.SetMinThreads(200, 200) at the beginning. Your code has a problem where if the wrong ordering occurs (and that's made more likely by using Asynchronous), you create a cycle where it can't be satisfied until there are enough threads to run all of the concurrent ConnectAsyncs your main method has queued, which aren't truly async and instead just create a work item to invoke the synchronous Connect method (this is unfortunate, and it's issues like this that are one of the reasons I urge folks not to expose async APIs that simply queue works items to call sync methods). Source.
Revised and simplified the example:
There's no true asynchronous Connect
method for pipes, ConnectAsync
uses Task.Factory.StartNew
behind the scene so you might just as well use Connect
and then pass the method (SendRequest
in our example) that calls the synchronous Connect
version to Task.Factory.StartNew
.
The server is completely asynchronous now and as far as I can tell it works with no issues.
Fixed all of the BeginXXX/EndXXX methods.
Removed unnecessary try/catch blocks.
Removed unnecessary messages.
Refactor the code a bit to make it more readable and concise.
Removed the async/await version of the server as I refactored the code and didn't have time to update the async/await version but with the above version you can have an idea of how to do it and the new APIs are much more friendly and easy to deal with.
I hope it helps.
Upvotes: 6
Reputation: 5850
When disconnecting, WaitForPipeDrain()
can throw an IOException
due to a broken pipe.
If this happens in your server Task
, then it will never listen for the next connection, and all of the remaining client connections hang on ConnectAsync()
.
If this happens in one of the client Tasks, then it will not continue to recurse and increment the counter for that index.
If you wrap the call to WaitForPipeDrain()
in a try
/catch
, the program will continue running forever, because your function HandleRequestAsync()
is infinitely recursive.
In short, to get this to work:
IOException
from WaitForPipeDrain()
HandleRequestAsync()
has to finish at some point.Upvotes: 4