SonOfPirate
SonOfPirate

Reputation: 5494

Implementing thread-safe, parallel processing

I am trying to convert an existing process in a way that it supports multi-threading and concurrency to make the solution more robust and reliable.

Take the example of an emergency alert system. When a worker clocks-in, a new Recipient object is created with their information and added to the Recipients collection. Conversely, when they clock-out, the object is removed. And in the background, when an alert occurs, the alert engine will iterate through the same list of Recipients (foreach), calling SendAlert(...) on each object.

Here are some of my requirements:

I've been looking at the Task and Parallel classes as well as the BlockingCollection and ConcurrentQueue classes but am not clear what the best approach is.

Is it as simple as using a BlockingCollection? After reading a ton of documentation, I'm still not sure what happens if Add is called while I am enumerating the collection.

UPDATE

A collegue referred me to the following article which describes the ConcurrentBag class and how each operation behaves:

http://www.codethinked.com/net-40-and-system_collections_concurrent_concurrentbag

Based on the author's explanation, it appears that this collection will (almost) serve my purposes. I can do the following:

  1. Create a new collection

    var recipients = new ConcurrentBag();

  2. When a worker clocks-in, create a new Recipient and add it to the collection:

    recipients.Add(new Recipient());

  3. When an alert occurs, the alert engine can iterate through the collection at that time because GetEnumerator uses a snapshot of the collection items.

    foreach (var recipient in recipients) recipient.SendAlert(...);

  4. When a worker clocks-out, remove the recipient from the collection:

    ???

The ConcurrentBag does not provide a way to remove a specific item. None of the concurrent classes do as far as I can tell. Am I missing something? Aside from this, ConcurrentBag does everything I need.

Upvotes: 0

Views: 370

Answers (4)

Drew Marsh
Drew Marsh

Reputation: 33379

ConcurrentBag<T> should definitely be the best performing class out of the bunch for you to use for such a case. Enumeration works exactly as your friend describes and so it should serve well for the scenario you have laid out. However, knowing you have to remove specific items from this set, the only type that's going to work for you is ConcurrentDictionary<K, V>. All the other types only offer a TryTake method which, in the case of ConcurrentBag<T>, is indeterminate or, in the case of ConcurrentQueue<T> or ConcurrentStack<T> ordered only.

For broadcasting you would just do:

ConcurrentDictionary<string, Recipient> myConcurrentDictionary = ...;

...

foreach(Recipient recipient in myConcurrentDictionary.Values)
{
    ...
}

The enumerator is once again a snapshot of the dictionary in that instant.

Upvotes: 1

Gideon Engelberth
Gideon Engelberth

Reputation: 6155

Your requirements strike me as an good fit for the way standard .NET events are triggered in C#. I don't know offhand if the VB syntax gets compiled to similar code or not. The standard pattern looks something like:

public event EventHandler Triggered;
protected void OnTriggered()
{
    //capture the list so that you don't see changes while the
    //event is being dispatched.
    EventHandler h = Triggered;
    if (h != null)
        h(this, EventArgs.Empty);
}

Alternatively, you could use an immutable list class to store the recipients. Then when the alert is sent, it will first take the current list and use it as a "snapshot" that cannot be modified by adding and removing while you are sending the alert. For example:

class Alerter
{
    private ImmutableList<Recipient> recipients;

    public void Add(Recipient recipient)
    {
        recipients = recipients.Add(recipient);
    }

    public void Remove(Recipient recipient)
    {
        recipients = recipients.Remove(recipient);
    }

    public void SendAlert()
    {
        //make a local reference to the current list so
        //you are not affected by any calls to Add/Remove
        var current = recipients;
        foreach (var r in current)
        {
            //send alert to r
        }
    }
}

You will have to find an implementation of an ImmutableList, but you should be able to find several without too much work. In the SendAlert method as I wrote it, I probably didn't need to make an explicit local to avoid problems as the foreach loop would have done that itself, but I think the copy makes the intention clearer.

Upvotes: -1

SonOfPirate
SonOfPirate

Reputation: 5494

I came into work this morning to an e-mail from a friend that gives me the following two answers:

1 - With regards to how the collections in the Concurrent namespace work, most of them are designed to allow additions and subtractions from the collection without blocking and are thread-safe even when in the process of enumerating the collection items.

With a "regular" collection, getting an enumerator (via GetEnumerator) sets a "version" value that is changed by any operation that affects the collection items (such as Add, Remove or Clear). The IEnumerator implementation will compare the version set when it was created against the current version of the collection. If different, an exception is thrown and enumeration ceases.

The Concurrent collections are designed using segments that make it very easy to support multi-threading. But, in the case of enumerating, they actually create a snapshot copy of the collection at the time GetEnumerator is called and the enumerator works against this copy. That allows changes to be made to the collection without adverse affects on the enumerator. Of course this means that the enumeration will know nothing of these changes but it sounds like your use-case allows this.

2 - As far as the specific scenario you are describing, I don't believe that a Concurrent collection is needed. You can wrap a standard collection using a ReaderWriterLock and apply the same logic as the Concurrent collections when you need to enumerate.

Here's what I suggest:

public class RecipientCollection
{
    private Collection<Recipient> _recipients = new Collection<Recipient>();
    private ReaderWriterLock _lock = new ReaderWriterLock();

    public void Add(Recipient r)
    {
        _lock.AcquireWriterLock(Timeout.Infinite);

        try
        {
            _recipients.Add(r);
        }
        finally
        {
            _lock.ReleaseWriterLock();
        }
    }

    public void Remove(Recipient r)
    {
        _lock.AcquireWriterLock(Timeout.Infinite);

        try
        {
            _recipients.Remove(r);
        }
        finally
        {
            _lock.ReleaseWriterLock();
        }
    }

    public IEnumerable<Recipient> ToEnumerable()
    {
        _lock.AcquireReaderLock(Timeout.Infinite);

        try
        {
            var list = _recipients.ToArray();

            return list;
        }
        finally
        {
            _lock.ReleaseReaderLock();
        }
    }
}

The ReaderWriterLock ensures that operations are only blocked if another operation that changes the collection's contents is in progress. As soon as that operation completes, the lock is released and the next operation can proceed.

Your alert engine would use the ToEnumerable() method to obtain a snapshot copy of the collection at that time and enumerate the copy.

Depending on how often an alert is sent and changes are made to the collection, this could be an issue but you might be able to still implement some type of version property that is changed when an item is added or removed and the alert engine can check this property to see if it needs to call ToEnumerable() again to get the latest version. Or encapsulate this by caching the array inside the RecipientCollection class and invalidating the cache when an item is added or removed.

HTH

Upvotes: 1

Drew Marsh
Drew Marsh

Reputation: 33379

There is much more to an implementation like this than just the parallel processing aspects, durability probably being paramount among them. Have you considered building this using an existing PubSub technology like say... Azure Topics or NServiceBus?

Upvotes: 0

Related Questions