feihtthief
feihtthief

Reputation: 7063

Easiest way to signal an event to several processes in .NET

I was hoping there was an easy way to signal an event to several processes that did not involve me writing a custom socket listener. I'm trying to inform several applications that will need to update their cached configuration settings that a config change has occurred. I wanted to implement a "host wide" singleton, but have failed to find any examples. Is such a thing even possible?

Upvotes: 8

Views: 14515

Answers (5)

ekalchev
ekalchev

Reputation: 962

@antiduh idea is great. I don't understand why he don't post entire source code. Here I added what he (unintentionally?) omitted in his post.

I also replaced Semaphores with AutoResetEvent

using System;
using System.Collections.Generic;
using System.IO.MemoryMappedFiles;
using System.IO;
using System.Text;
using System.Threading;
using System.Runtime.CompilerServices;
using System.Linq;
using System.Diagnostics;
using System.Runtime.ConstrainedExecution;
using System.Xml.Linq;

namespace Namespace
{
    /// <summary>
    /// Can be used to signal multiple processes
    /// The shared memory region that stores the registrations has the following structure:
    ///
    ///      +---- 4 Bytes ----+
    ///      |   NumListeners  |
    ///      +-----------------+
    ///      |   Listener ID   |
    ///      +-----------------+
    ///      |   Listener ID   |
    ///      +-----------------+
    ///      |       ...       |
    ///      +-----------------+
    ///
    /// By using Listener ID, we can construct cross process EventWaitHandle name that the listener is waiting to be signaled
    ///
    /// </summary>
    public sealed class SharedEventWaitHandle : IDisposable
    {
        private MemoryMappedFile registrations;
        private string registrationLockName;
        private string registrationsName;
        private int localWaitHandleId;
        private EventWaitHandle localWaitHandle;
        private Mutex registrationMutex;

        public SharedEventWaitHandle(string name, int maxListeners = 1024)
        {
            Name = name;
            MaxListeners = maxListeners;

            localWaitHandleId = -1;

            RuntimeHelpers.PrepareConstrainedRegions();

            registrationsName = GenerateRegistrationSharedMemoryFileName(name);
            registrationLockName = GenerateRegistrationLockName(name);

            registrationMutex = new Mutex(false, registrationLockName);

            try
            {
                AcquireRegistrationMutex();

                try
                {
                    registrations = MemoryMappedFile.CreateOrOpen(
                        registrationsName,
                        sizeof(int) + (maxListeners * sizeof(int)),
                        MemoryMappedFileAccess.ReadWrite,
                        MemoryMappedFileOptions.None,
                        HandleInheritability.None);

                    RegisterSelf();
                }
                finally
                {
                    ReleaseRegistrationMutex();
                }
            }
            catch
            {
                Dispose();
                throw;
            }
        }

        public string Name { get; }
        public int MaxListeners { get; }
        public WaitHandle WaitHandle => localWaitHandle;

        public void Dispose()
        {
            UnregisterSelf();
            registrationMutex.Dispose();
        }

        public void Trigger(bool suppressSelfHandler = false)
        {
            bool modifiedList = false;

            // This is not supported in dotnet core but it is not needed
            // The lack of AppDomains, ThreadAbortException remove the need of this call

            // The finally block is a ConstrainedExecutionRegion... we definitely don't want to
            // deadlock the whole shared event because we crashed while holding the lock.
            RuntimeHelpers.PrepareConstrainedRegions();

            AcquireRegistrationMutex();

            try
            {
                List<int> ids = ReadListenerIds();

                for (int i = 0; i < ids.Count; /* conditional increment */ )
                {
                    int memberId = ids[i];

                    if (suppressSelfHandler && memberId == localWaitHandleId)
                    {
                        i++;
                        continue;
                    }

                    EventWaitHandle handle = null;
                    try
                    {
                        handle = GetListenerWaitHandle(memberId, memberId == localWaitHandleId);

                        if (handle == null)
                        {
                            // The listener's wait handle is gone. This means that the listener died
                            // without unregistering themselves.
                            ids.RemoveAt(i);
                            modifiedList = true;
                        }
                        else
                        {
                            try
                            {
                                handle.Set();
                            }
                            catch (ObjectDisposedException) { }

                            i++;
                        }
                    }
                    finally
                    {
                        handle?.Dispose();
                    }
                }

                if (modifiedList)
                {
                    WriteListenerIds(ids);
                }
            }
            finally
            {
                ReleaseRegistrationMutex();
            }
        }

        private static string GenerateRegistrationLockName(string name)
        {
            return @"Local\registration.lock." + name;
        }

        private static string GenerateRegistrationSharedMemoryFileName(string name)
        {
            return @"Local\shared.memory." + name;
        }

        private static string GenerateRegistrationWaitHandleName(string name, int memberId)
        {
            return @"Local\wait.handle." + name + memberId;
        }

        private void AcquireRegistrationMutex()
        {
            try
            {
                registrationMutex.WaitOne();
            }
            catch (AbandonedMutexException) { };
        }

        private void ReleaseRegistrationMutex()
        {
            registrationMutex.ReleaseMutex();
        }

        private void RegisterSelf()
        {
            // this method should be called when registration lock is held
            List<int> ids = ReadListenerIds();

            if (ids.Count >= MaxListeners)
            {
                throw new InvalidOperationException(
                    "Cannot register self with SharedEvent - no more room in the shared memory's registration list. Increase 'MaxListeners'.");
            }

            localWaitHandleId = GenerateNextId(ids);
            localWaitHandle = GetListenerWaitHandle(localWaitHandleId, true);

            ids.Add(localWaitHandleId);
            ids.Sort();

            WriteListenerIds(ids);
        }

        private int GenerateNextId(List<int> ids)
        {
            if (ids?.Count > 0)
            {
                return ids.Max() + 1;
            }

            return 0;
        }

        private EventWaitHandle GetListenerWaitHandle(int memberId, bool localWaitHandle)
        {
            EventWaitHandle waitHandle = null;

            // This is not supported in dotnet core but it is not needed
            // The lack of AppDomains and ThreadAbortException remove the need of this call in dotnet core
            RuntimeHelpers.PrepareConstrainedRegions();

            try
            {
                waitHandle = new EventWaitHandle(false, EventResetMode.AutoReset, GenerateRegistrationWaitHandleName(Name, memberId), out bool createdNew);

                if (localWaitHandle == false && createdNew == true)
                {
                    // if we expect to find non-local wait handle, it must be already created
                    // in case of createdNew == true that means the process that handle
                    // belongs has died. In this case we should close the handle and return null
                    waitHandle.Dispose();
                    waitHandle = null;
                }

                return waitHandle;
            }
            catch
            {
                waitHandle?.Dispose();
                throw;
            }
        }

        private void UnregisterSelf()
        {
            // This is not supported in dotnet core but it is not needed
            // The lack of AppDomains and ThreadAbortException remove the need of this call in dotnet core
            RuntimeHelpers.PrepareConstrainedRegions();

            AcquireRegistrationMutex();

            try
            {
                List<int> ids = ReadListenerIds();

                if (localWaitHandleId != -1 && ids.Contains(this.localWaitHandleId))
                {
                    ids.Remove(localWaitHandleId);
                }

                if (localWaitHandle != null)
                {
                    localWaitHandle.Dispose();
                    localWaitHandle = null;
                }

                WriteListenerIds(ids);
                registrations?.Dispose();
            }
            finally
            {
                ReleaseRegistrationMutex();
            }
        }

        private List<int> ReadListenerIds()
        {
            int numMembers;
            int[] memberIds;
            int position = 0;

            using (var view = this.registrations.CreateViewAccessor())
            {
                numMembers = view.ReadInt32(position);
                position += sizeof(int);

                memberIds = new int[numMembers];

                view.ReadArray(position, memberIds, 0, numMembers);
                position += sizeof(int) * numMembers;
            }

            return new List<int>(memberIds);
        }

        private void WriteListenerIds(List<int> listenerIds)
        {
            int position = 0;

            using (var view = registrations.CreateViewAccessor())
            {
                view.Write(position, listenerIds.Count);
                position += sizeof(int);

                foreach (int id in listenerIds)
                {
                    view.Write(position, id);
                    position += sizeof(int);
                }
            }
        }
    }
}

Upvotes: 1

antiduh
antiduh

Reputation: 12407

I solved this problem using the following parts:

  • A single shared/named memory region.
  • A single shared/named semaphore that protects access to that shared memory region.
  • A per-process named semaphore that, when invoked, triggers the event locally in that process.

My solution relies on the following tricks:

  • Shared semaphores can tell you when you're the first to open them, via the out createNew parameter.
  • When a shared memory region is opened and nobody was holding a handle to it previously (you're the first to open it), the shared memory region happens to be initialized to all zeros. Thanks Windows!
  • You can register wait handles with the thread pool, providing the registration a callback. Then when the wait handle becomes signaled, the threadpool will run your callback.
  • Named memory regions and named semaphores are automatically destroyed when the last process holding a handle ends, even if the process ended by crashing.

My strategy is thus:

  • The shared memory region stores the number of registered listeners, as well as each listeners unique ID.
  • Each listener creates their per-process lock using a well-known named semaphore based on their unique ID when they register themselves with the shared memory region.
  • Each listener registers their per-process lock with the thread pool, so that when it gets popped, they'll get a callback on their handler method.
  • When one process wants to trigger the event in all processes, he looks up all registered IDs in the shared memory region, then uses those IDs to open the per-process semaphores and pops them.
  • If, when we open someone else's shared semaphore, we find that the semaphore out createdNew parameter is true, we know that the process crashed without deregistering itself, so we ignore it and deregister it ourselves right then and there.

Some code:

The constructor, which opens the shared memory region, the shared memory region's lock, and registers this processes per-process lock:

    ...
    /// <remarks>
    /// ...
    /// The shared memory region that stores the registrations has the following structure:
    ///
    ///      +---- 4 Bytes ----+
    ///      |   NumListeners  |
    ///      +-----------------+
    ///      |   Listener ID   |
    ///      +-----------------+
    ///      |   Listener ID   |
    ///      +-----------------+
    ///      |       ...       |
    ///      +-----------------+
    ///
    /// ...
    /// </remarks>      
    public SharedEvent( string name, int maxListeners = 1024 )
    {
        this.Name = name;
        this.MaximumListeners = maxListeners;

        this.localWaitHandleId = -1;

        try
        {
            this.registrationLock = new Semaphore( 1, 1, RegistrationLockName() );

            this.registrations = MemoryMappedFile.CreateOrOpen(
                RegistrationShmemName(),
                4 + maxListeners * 4,
                MemoryMappedFileAccess.ReadWrite,
                MemoryMappedFileOptions.None,
                null,
                HandleInheritability.None
            );

            RegisterSelf();
        }
        catch
        {
            Dispose();
            throw;
        }
    }

The code that fires the event in all registrants:

    public void Trigger(bool suppressSelfHandler = false)
    {
        bool modifiedList = false;

        // The finally block is a ConstrainedExecutionRegion... we definitely don't want to
        // deadlock the whole shared event because we crashed while holding the lock.
        RuntimeHelpers.PrepareConstrainedRegions();
        this.registrationLock.WaitOne();
        try
        {
            List<int> ids = ReadListenerIds();

            for( int i = 0; i < ids.Count; /* conditional increment */ )
            {
                int memberId = ids[i];

                if( suppressSelfHandler && memberId == this.localWaitHandleId )
                {
                    i++;
                    continue;
                }

                Semaphore handle = null;
                try
                {
                    handle = GetListenerWaitHandle( memberId, false );

                    if( handle == null )
                    {
                        // The listener's wait handle is gone. This means that the listener died
                        // without unregistering themselves.

                        ids.RemoveAt( i );
                        modifiedList = true;
                    }
                    else
                    {
                        handle.Release();
                        i++;
                    }
                }
                finally
                {
                    handle?.Dispose();
                }
            }

            if( modifiedList )
            {
                WriteListenerIds( ids );
            }
        }
        finally
        {
            this.registrationLock.Release();
        }
    }

This shows how a process registers itself in the system:

    private void RegisterSelf()
    {
        RuntimeHelpers.PrepareConstrainedRegions();
        try
        {
            this.registrationLock.WaitOne();
            List<int> ids = ReadListenerIds();

            if( ids.Count >= this.MaximumListeners )
            {
                throw new InvalidOperationException(
                    "Cannot register self with SharedEvent - no more room in the shared memory's registration list. Increase 'maxSubscribers'."
                );
            }

            this.localWaitHandleId = FindNextListenerId( ids );

            ids.Add( this.localWaitHandleId );
            ids.Sort();

            this.localWaitHandle = GetListenerWaitHandle( this.localWaitHandleId, true );
            this.localWaitHandleReg = ThreadPool.RegisterWaitForSingleObject(
                this.localWaitHandle,
                this.WaitHandleCallback,
                null,
                -1,
                false
            );

            WriteListenerIds( ids );
        }
        finally
        {
            this.registrationLock.Release();
        }
    }

    private void UnregisterSelf()
    {
        RuntimeHelpers.PrepareConstrainedRegions();
        try
        {
            this.registrationLock.WaitOne();

            List<int> ids = ReadListenerIds();

            if( this.localWaitHandleId != -1 && ids.Contains( this.localWaitHandleId ) )
            {
                ids.Remove( this.localWaitHandleId );
            }

            if( this.localWaitHandleReg != null )
            {
                this.localWaitHandleReg.Unregister( this.localWaitHandle );
                this.localWaitHandleReg = null;
            }

            if( this.localWaitHandle != null )
            {
                this.localWaitHandle.Dispose();
                this.localWaitHandle = null;
            }

            WriteListenerIds( ids );
        }
        finally
        {
            this.registrationLock.Release();
        }
    }

The code that reads and writes the shmem registration lists:

    private List<int> ReadListenerIds()
    {
        int numMembers;
        int[] memberIds;
        int position = 0;

        using( var view = this.registrations.CreateViewAccessor() )
        {
            numMembers = view.ReadInt32( position );
            position += 4;

            memberIds = new int[numMembers];

            view.ReadArray( position, memberIds, 0, numMembers );
            position += sizeof( int ) * numMembers;
        }

        return new List<int>( memberIds );
    }

    private void WriteListenerIds( List<int> listenerIds )
    {
        int position = 0;

        using( var view = this.registrations.CreateViewAccessor() )
        {
            view.Write( position, (int)listenerIds.Count );
            position += 4;

            foreach( int id in listenerIds )
            {
                view.Write( position, (int)id );
                position += 4;
            }
        }
    }

Upvotes: 1

Franci Penov
Franci Penov

Reputation: 75981

For tightly coupled publisher/subscriber model (where the publisher is explicitly aware of all subscribers):

  • You can use a semaphore set by the publisher and have all subscribers wait on it. However, if any of the subscribers dies, your count will be thrown off. You will need to implement some form of a zombie detection

  • You could use COM connection points. This requires admin registration of the COM classes and type libraries.

For loosely coupled publisher/subscriber model (where the publisher doesn't know anythig about the subscribers):

  • If you configuration settigns are kept in a file or in the registry, the subscribers can implement a file or a registry change listener. Unfortunately, this solution is limited to file/registry changes, does not scale and can be subject to delays based on the file system/registry load.

  • You can use the COM+ Loosely Coupled Events (through System.EnterpriseServices). However, this could be an overkill for you due to the complexity of LCE.

  • You can broadcast a window message that the publisher registered through RegisterWindowMessage to all hidden top-level windows of particular class. All subscribers will need to create one such window. This would require some Win32 interop, but is probably the most lightweight way to implement loosely coupled publisher/subscriber.

Upvotes: 2

Jim Mischel
Jim Mischel

Reputation: 133975

You can use a named EventWaitHandle. However, you'd also need some way to reset the event after all of the processes listening were notified. You can probably do something like set the event and then reset it after some short period of time: one second or five seconds. Clients could know that the event won't trigger that quickly in succession.

Another possibility is to use a named Semaphore. But then you need to know how many listeners there are so that you can set the Semaphore's initial and maximum values.

There are ways to do what you're asking without having to build anything fancy.

Upvotes: 5

grega g
grega g

Reputation: 1089

Named semaphore and Named mutex are used for interprocess synchronization.

Msdn says:

Semaphores are of two types: local semaphores and named system semaphores. If you create a Semaphore object using a constructor that accepts a name, it is associated with an operating-system semaphore of that name. Named system semaphores are visible throughout the operating system, and can be used to synchronize the activities of processes.

Msdn says:

Named system mutexes are visible throughout the operating system, and can be used to synchronize the activities of processes. You can create a Mutex object that represents a named system mutex by using a constructor that accepts a name. The operating-system object can be created at the same time, or it can exist before the creation of the Mutex object.

Hope this helps

Upvotes: 4

Related Questions