Reputation: 109
I am new to multitasking
and IPC
and I am trying to construct an approach for fast inter process comunication using shared memory( at first I was researching the IPC term, having in mind wcf
sockets
and named pipes
only to eventually discover about MMF).
Now that I have successfully implemented a small test using shared memory between two processes via using Lock
and EventWaitHandle
signalling, I am up to an approach that implements non blocking / no-wait pattern. Now, I am trying to combine Thread.MemoryBarrier()
& reading a signalling Sector
from the MemoryMapedFile
.
The problem is unidentified! First round goes through and second was last sited in Bermuda triangle ... out of the scope of the debugger...
say process a is sending a burst of requests for a showMsg()
to process b.
//offset positions in mmf
MemoryMappedViewAccessor MmfAcc; const int opReady= 0, opCompleteRead = 4, .....
ReadTrd()
{
//[0,3] - Reader is stationed
//[4,7] - Read Complete successfully
//[8,11] - Data-size
//[12,15] - Reader-exiting
"format" the signals Section (write zeroes).
for(;;){if (WrTrd-StepMMF1 Confimed) break;}
MmfAcc- read DataSize val @offset[8]
MmfAcc- read Data val @offset[50]
MmfAcc Write exit to offset....
....heavy use of Thread.MemoryBarrier(); sets !!! (all over the place, on every shared variable...)
}
writeTrd()
{
heavy use of Thread.MemoryBarrier() !!!
//[15-19] - wr is stationed
//[20-23] - wr Complete successfully
//[24-27] - wrExiting
"format" the signals Section .
for(;;){if Reader-StepMMF1 is Confim break;}
MmfAcc- DataSize to offset[8]
write Data To offset[50] using the method below
for(;;){if Read StepMMF2 is Confim break;}
}
As I was first using the named piped solution, the Mmf approach(despite Lock
and EventWaitHandle) was great performance gain compared to the namedpipe approach, but could I even go further using the above approach somehow.. ?
I could just clone this pattern like striping Raid ...
Reader1 + Reader2 & WriteThred1 + WriteThread2
so I tried it and got stuck at that point.
Is this valid approach using the full-memoryfence & sharedmemory for signalling?
If so, all is left is to see why the second iteration failed , the performance difference.
EDIT - added the logic behind the extra threads test
This is the "Bridge" I am using to manipulate writer threads (same approach for readers.
public void Write(byte[] parCurData)
{
if (ReadPosition < 0 || WritePosition < 0)
throw new ArgumentException();
this.statusSet.Add("ReadWrite:-> " + ReadPosition + "-" + WritePosition);
// var s = (FsMomitorIPCCrier)data;
////////lock (this.dataToSend)
////////{
Thread.MemoryBarrier();
LiveDataCount_CurIndex = dataQue.Where(i => i != null).Count();
this.dataQue[LiveDataCount_CurIndex] = parCurData;
Console.WriteLine("^^^^^" + Thread.CurrentThread.Name + " has Entered WritingThreads BRIDGE");
Console.WriteLine("^^^^^[transactionsQue] = {1}{0}^^^^^[dataQue.LiveDataASIndex = {2}{0}^^^^^[Current Requests Count = {3}{0}", "\r\n", Wtransactions, LiveDataCount_CurIndex, ++dataDelReqCount);
//this.itsTimeForWTrd2 = false;
if (Wtransactions != 0 && Wtransactions > ThrededSafeQ_Initial_Capcity - 1)
if (this.dataQueISFluded) this.DataQXpand();
if (itsTimeForWTrd2)
{
bool firstWt = true;
while (writerThread2Running)
{
if (!firstWt) continue;
Console.WriteLine("SECOND WRITERThread [2] is In The CoffeeCorner");
firstWt=false;
}
this.dataDelivery2 = this.dataQue[LiveDataCount_CurIndex];
Console.WriteLine("Activating SECOND WRITERThread [2]");
itsTimeForWTrd2 = false;
writerThread2Running = true;
//writerThread1Running = true;
writerThread2 = new System.Threading.Thread(WriterThread2);
writerThread2.IsBackground = true;
writerThread2.Name = this.DepoThreadName + "=[WRITER2]";
writerThread2.Start();
}
else
{
bool firstWt = true;
while (writerThread1Running)
{
if (!firstWt)continue;
Console.WriteLine("WRITERThread [1] is In The CoffeeCorner");
firstWt=false;
}
Console.WriteLine("Activating WRITERThread [1]");
this.dataDelivery1 = this.dataQue[LiveDataCount_CurIndex];
writerThread1Running = true;
writerThread1 = new System.Threading.Thread(WriterThread1);
writerThread1.IsBackground = true;
writerThread1.Name = this.DepoThreadName+"=[WRITER1]";
writerThread1.Start();
itsTimeForWTrd2 = true;
}
Thread.MemoryBarrier();
}
Using the write handle to read & write the actual data (similar code for the write)
public unsafe byte[] UsReadBytes(int offset, int num)
{
byte[] arr = new byte[num];
byte* ptr = (byte*)0;
this.accessor.SafeMemoryMappedViewHandle.AcquirePointer(ref ptr);
Marshal.Copy(IntPtr.Add(new IntPtr(ptr), offset), arr, 0, num);
this.accessor.SafeMemoryMappedViewHandle.ReleasePointer();
return arr;
}
As I said, I have researched this issue of synchronization of the data and shared memory via non blocking / no-wait / etc. Semaphores Locks so I am trying to remove any kind of overhead during the process of each transaction of data into the shared memory mapped file. I am here to ask what could be the problem eliminating the Lock And The EventWaitHandle and replacing it with the logic of memory fences and signaling through the mmf?
Upvotes: 6
Views: 1863
Reputation: 664
If you are planning to use this for an specific purpose other than R&D, the easiest way would be to use a library that already provides that. One way to think about this is, Lock Free = Message Passing. Two possible approaches are: For a minimalistic message-passing implementation, ZeroMQ IPC, which provides great .Net support and superb IPC performance (http://www.codeproject.com/Articles/488207/ZeroMQ-via-Csharp-Introduction) and for a more complete Actor-Model implementation (including support for IPC), look at Akka.net (http://getakka.net/docs/#networking).
If the purpose is more R&D in nature (meaning, you want to write your own implementation, which is cool), I would still suggest to look at the source of these products (especially, Akka.net, since it's written in C#), for implementation ideas about Message-passing and Actor-based IPC.
Upvotes: 0