Prasad
Prasad

Reputation: 139

Using pipes for IPC in same process using C#

I have two threads. One threads reads requests and passes it to server using message queue and other thread reads the response from message queue and sends it back. In the same process the caller class method writes the request on the pipe (using the server pipe stream shared by first thread) and then reads the response using the client pipe stream shared by the second thread. This can be easily done using Java PipeInputStream and PipeOutputStream as follows. Essentially I am looking for equivalent of following Java logic in C#. I tried unsuccessfully using anonymous pipes in C#.

RequestHandlerThread (Thread1 as mentioned above)

out = new PipedOutputStream();
readPipeIs = new PipedInputStream(out);
readDataIs = new DataInputStream(readPipeIs);
// read data from readDataIs
// Send it to server over message queue
// Share 'out' so that other class method can write to it. 

Response Handler (Thread 2 as mentioned above)

in = new PipedInputStream();
writePipeOs = new PipedOutputStream(in);
writeDataOs = new DataOutputStream(writePipeOs);

// Wait and read from message queue
// write received data to 'writeDataOs'
// Share 'in' so that other class method can read from it. 

I am not sure if C# pipes are restricted for communicating between two processes. All the above logic is in same process just that there are two threads to communicate with message server.

I tried pair of AnonymousPipeServerStream and AnonymousPipeClientStream pair in both threads. I shared the server stream for writing and client stream for reading by other class method.

Any obvious flaw in above logic or any suggestions with choice of IPC ?

Adding source code Here is the Test class

class Test
{
    private static byte[] ret;
    private static bool ready;

    Stream outStream;
    Stream inStream;


    private void clientConnReqHandler()
    {
        AnonymousPipeServerStream pipeServer = new
              AnonymousPipeServerStream(PipeDirection.Out);

        outStream = pipeServer;

        string pipeHandle = 
                     pipeServer.GetClientHandleAsString();

        AnonymousPipeClientStream pipeClient =
            new AnonymousPipeClientStream(PipeDirection.In, 
                       pipeHandle);

        pipeServer.DisposeLocalCopyOfClientHandle();

        ready = false;
        BinaryReader binReader = new BinaryReader(pipeClient);
        int mesgSize = binReader.ReadInt32();
        System.Console.WriteLine("Message Lenght To read: " + 
                                  mesgSize);
        byte[] buffer = binReader.ReadBytes(mesgSize);
        System.Console.WriteLine("Message read: " + 
                    buffer.ToString());
        // Simulate some processing 
        Thread.Sleep(5000);
        mesgProcessing(buffer);

    }
    private static void mesgProcessing(byte[] buffer)
    {

        System.Text.UTF8Encoding encoding = new 
                            System.Text.UTF8Encoding();
        byte[] extra = encoding.GetBytes("Echo : ");

        ret = new byte[buffer.Length + extra.Length];
        System.Buffer.BlockCopy(extra, 0, ret, 0, extra.Length);
        System.Buffer.BlockCopy(buffer, 0, ret, extra.Length, 
                                buffer.Length);
        ready = true;
    }


    private void clientConnRespHandler()
    {
        AnonymousPipeServerStream pipeServer = new 
                AnonymousPipeServerStream(PipeDirection.Out);

        string pipeHandle = 
                  pipeServer.GetClientHandleAsString();

        AnonymousPipeClientStream pipeClient =
            new AnonymousPipeClientStream(PipeDirection.In, 
                  pipeHandle);

        inStream = pipeClient;
        pipeServer.DisposeLocalCopyOfClientHandle();

        while (ready)
        {
            BinaryWriter binWriter = new 
                           BinaryWriter(pipeServer);
            binWriter.Write(ret.Length);
            binWriter.Write(ret);
            ready = false;
        }
    }

    public static void Main()
    {
        Test setup = new Test();
        setup.threadTest();

        Test2 threadTest = new Test2();
        // This method will do actuall read and write. 
        threadTest.runTest(setup.inStream, setup.outStream);
    }
    public void threadTest()
    {
        Thread reqHandlerThread = new Thread(new 
                ThreadStart(clientConnReqHandler));
        Thread respHandlerThread = new Thread(new 
               ThreadStart(clientConnRespHandler));

        reqHandlerThread.Start();
        respHandlerThread.Start();

    }
}

The class that does read/write:

class Test2
{

    internal void runTest(System.IO.Stream inStream, 
                  System.IO.Stream outStream)
    {
        BinaryWriter writer = new BinaryWriter(outStream);

        System.Text.UTF8Encoding encoding = new 
                 System.Text.UTF8Encoding();
        byte[] mesg = encoding.GetBytes("Hello World!!!");

        writer.Write(mesg.Length);
        writer.Write(mesg);

        BinaryReader reader = new BinaryReader(inStream);
        int mesgSize = reader.ReadInt32();
        System.Console.WriteLine("Message Lenght To read: " + 
                      mesgSize);
        byte[] buffer = reader.ReadBytes(mesgSize);
        System.Console.WriteLine("Message read: " + 
                    buffer.ToString());
    }
}

thanks

Upvotes: 3

Views: 4142

Answers (1)

Prasad
Prasad

Reputation: 139

OK. It worked after getting rid of DisposeLocalCopyOfClientHandle(). Of course had to fix some new-by mistakes of while loop condition to check if the data is ready and printing the string correctly from byte array.

Upvotes: 0

Related Questions