Greg B Roberts
Greg B Roberts

Reputation: 173

Is a named pipe able to do what i want to do?

This is take II, i posted the other week and my question was put on hold, i adjusted my text but could not get a review, and the system closed the original post.

Logic

Server Side: Read only - Server Opens pipe then at regular interval checks if there is content (i.e. not at end of stream) and reads info. This check has to be poll based as only during the poll is there a valid context to pass on the data..

Client Side: Write-only - Open pipe, write to pipe, close (client.exe called many times, has short life span, code below is test code), e.g. some other script will "call client.exe with info"

Code snippets

    for (int i = 0; i < 10; i++)
    {
        //Client - simulate exe starting and ending
        var client = new NamedPipeClientStream(".", "PipesOfPiece", PipeDirection.Out, PipeOptions.WriteThrough);
        client.Connect();
        StreamWriter writer = new StreamWriter(client);

        Console.WriteLine("Client about to send message");
        writer.WriteLine("Called from client i = {0}", i);

        writer.Close();
        client.Close();

        Thread.Sleep(5000);
    }
    // server snippet
        var server = new NamedPipeServerStream("PipesOfPiece", PipeDirection.In);
        server.WaitForConnection(); <= can this we optional with code below
        StreamReader reader = new StreamReader(server);
        while (true)
        {
            // simulate start of poll code
            if (server.IsConnected)
            {
                if (!reader.EndOfStream)
                {
                    var line = reader.ReadToEnd();
                    Console.WriteLine("Server: {0}", line);
                }
            } // End of poll code
            Thread.Sleep(1000);
        }
      // server snippet
    var server = new NamedPipeServerStream("PipesOfPiece", PipeDirection.In);
        server.WaitForConnection(); <= can this we optional with code below
        StreamReader reader = new StreamReader(server);
        while (true)
        {
            // simulate start of poll code
            if (server.IsConnected)
            {
                if (!reader.EndOfStream)
                {
                    var line = reader.ReadToEnd();
                    Console.WriteLine("Server: {0}", line);
                }
            } // End of poll code
            Thread.Sleep(1000);
        }

So I am rusty on my pipes, I am hoping that a pipe can be opened, written to then read, and the waitforconnect() is there for the cases where you want this and is optional. I suppose it all triggers around who owns the pipe, i.e. if the server opens a pipe and is waiting for someone to write for it, why does it need to wait for a connect ? (I am hoping the server is the owner so when it ends, the pipe disappears)

Upvotes: 2

Views: 3106

Answers (1)

Peter Duniho
Peter Duniho

Reputation: 70701

Without a good, minimal, complete code example that reliably reproduces whatever specific problem you are having, it is impossible to provide specific advice as to how to fix that problem. However, I can at least try to answer some of your questions about how named pipes can be used, and provide a code example to illustrate some of the concepts.


First, some rules and observations:

  • A pipe instance can be used for only one connection. Note that pipes inherit Stream, and streams have a very specific paradigm: you open one, read to the end, and then you're done with the stream. Some streams, like FileStream, are seekable but even there you are only ever dealing with a single resource (i.e. the original file…you can't reconnect a FileStream to a different file), and network streams aren't even seekable.
  • A pipe must be connected before you perform I/O on it.
  • You may have multiple instances of pipes with the same name (if you initialize them correctly…by default, you may only have one pipe of any given name).
  • Clients trying to connect to a named pipe will wait until such a pipe exists. It does not need to exist at the time the client initiates the connection.
  • Only one client can connect to any given instance of a pipe. Any given instance of a server pipe can only ever handle a single client during its entire lifetime (see the very first point above).

So, what about your questions?


Can this work flow be handled in pipes ?

If I understand the work-flow correctly, yes. But you need to be careful to implement it correctly.

As I understand it, you want for your server to only attempt to read from clients periodically. At the same time, you want for a client to be able to write to a pipe at any time. This can be done, but it won't be straightforward.

Note per the above that you cannot open a single server pipe, and then let multiple clients periodically connect and disconnect from that pipe. Once the first client has connected, the pipe is no longer usable. It's a stream, and that first client's disconnection causes the stream to reach its end. It's done.

Note also that while a client can attempt to connect to a pipe that doesn't exist yet, it will wait until it can. So if you want your clients to not have to wait until the polling interval has expired, you'll need to maintain a server pipe available to connect to at all times.

But you've already said that you won't be able to handle data read from the server pipe at arbitrary points in time, but rather only during your polling interval.

Because pipes don't inherently support this specific scenario, IMHO the right way to implement it is to separate the behaviors into two different components. Maintain a simple pipe server that opens a pipe, waits for a client to connect, reads whatever the client has sent, closes the pipe, and then starts over.

Then have an intermediary class that can act as the go-between for the server I/O and whatever component you have that ultimately receives the data. This intermediary will retain a copy of the data after it's been received (the pipe code will deliver it to the intermediary as soon as it's received, regardless of the polling interval); the data will then later be retrieved by the polling component on its next polling interval (i.e. when the "context" as you put it is in fact available to which to deliver the data).

I am hoping that a pipe can be opened, written to then read, and the waitforconnect() is there for the cases where you want this and is optional

Unfortunately, your hope doesn't match the reality. Pipes can be bidirectional; i.e. "written to then read". But WaitForConnect() is not optional. The server must wait for a connection before attempting to read from the pipe, and for that pipe instance it will only ever be able to receive data from a single client.

I am hoping the server is the owner so when it ends, the pipe disappears

The server process is the one that actually creates the pipe. So yes, in that sense it is the owner. And yes, when the server process is terminated, any pipes it's created are destroyed.


Below, please find a simple code example that illustrates the use of multiple and concurrent servers and clients. You can adjust the numbers of each with the declared constants at the top of the example.

When running it, note that if more clients are active than servers, the additional clients will simply wait until a server pipe is available to connect to. Once one is, they will connect and proceed normally. If there are at least as many server pipe instances as there are clients trying to connect, all of the clients are serviced concurrently.

// NOTE: as a sample program, contrary to normal and correct
// programming practices error-handling has been omitted, and
// non-awaited async methods have been declared as void.
class Program
{
    private const string _kserverName = "TestSO33093954NamedPipeClients";
    private const int _kmaxServerCount = 3;
    private const int _kmaxClientCount = 3;

    static void Main(string[] args)
    {
        StartServers(_kmaxServerCount);
        StartClients(_kmaxClientCount);

        Console.WriteLine("Clients are being started. Press return to exit program.");
        Console.ReadLine();
    }

    private static async void StartClients(int clientCount)
    {
        for (int i = 0; i < clientCount; i++)
        {
            RunClient(i);
            await Task.Delay(300);
        }
    }

    private static async void RunClient(int instance)
    {
        NamedPipeClientStream client = new NamedPipeClientStream(
            ".", _kserverName, PipeDirection.InOut, PipeOptions.Asynchronous);

        client.Connect();

        ReadClient(client);

        using (StreamWriter writer = new StreamWriter(client))
        {
            writer.AutoFlush = true;

            for (int i = 0; i < 5; i++)
            {
                string text =
                    string.Format("Instance #{0}, iteration #{1}", instance, i);

                Console.WriteLine("Client send: " + text);
                await writer.WriteLineAsync(text);
                await Task.Delay(1000);
            }

            client.WaitForPipeDrain();
        }
    }

    private static async void ReadClient(Stream stream)
    {
        using (TextReader reader = new StreamReader(stream))
        {
            string line;

            while ((line = await reader.ReadLineAsync()) != null)
            {
                Console.WriteLine("Client recv: " + line);
            }
        }
    }

    private static void StartServers(int maxServerInstances)
    {
        for (int i = 0; i < maxServerInstances; i++)
        {
            RunServer(maxServerInstances);
        }
    }

    private static async void RunServer(int maxServerInstances)
    {
        while (true)
        {
            using (NamedPipeServerStream server = new NamedPipeServerStream(
                _kserverName, PipeDirection.InOut, maxServerInstances,
                 PipeTransmissionMode.Byte, PipeOptions.Asynchronous))
            {
                await server.WaitForConnectionAsync();

                byte[] buffer = new byte[1024];
                int bytesRead;
                Decoder decoder = Encoding.UTF8.GetDecoder();

                while ((bytesRead =
                    await server.ReadAsync(buffer, 0, buffer.Length)) > 0)
                {
                    int cch = decoder.GetCharCount(buffer, 0, bytesRead);
                    char[] rgch = new char[cch];

                    decoder.GetChars(buffer, 0, bytesRead, rgch, 0);
                    Console.Write("Server recv: " + new string(rgch));

                    await server.WriteAsync(buffer, 0, bytesRead);
                }
            }
        }
    }
}

static class PipeExtensions
{
    // As I am not running with .NET 4.6 yet, I need this little helper extension
    // to wrap the APM-based asynchronous connection-waiting with the await-friendly
    // Task-based syntax. Anyone using .NET 4.6 will have this in the framework already
    public static Task WaitForConnectionAsync(this NamedPipeServerStream server)
    {
        return Task.Factory.FromAsync(
            server.BeginWaitForConnection, server.EndWaitForConnection, null);
    }
}

Upvotes: 6

Related Questions