Reputation: 1063
I'm writing an application which manages a collection that requires frequent enqueuing and dequeuing of items in a miltithreaded environment. With single threaded, a simple List is probably enough, but concurrent nature of the environment poses some issues.
Here's the summary:
The structure needs to have a bool TryAdd(T) method, preferrably Add(TKey, TValue);
The structure needs to have a T TryRemove() method which takes a random or preferrably the first added item (essentially implementing a FIFO queue);
The structure needs to have a bool TryRemove(T) method, preferrably Remove(TKey);
So far I have three ideas, all with their issues:
internal class ConcurrentQueuedDictionary<TKey, TValue> where TKey : notnull
{
ConcurrentDictionary<TKey, TValue> _dictionary;
ConcurrentQueue<TKey> _queue;
object _locker;
public bool TryAdd(TKey key, TValue value)
{
if (!_dictionary.TryAdd(key, value))
return false;
lock (_locker)
_queue.Enqueue(key);
return true;
}
public TValue TryRemove()
{
TKey key;
lock (_locker) {
if (_queue.IsEmpty)
return default(TValue);
_queue.TryDequeue(out key);
}
TValue value;
if (!_dictionary.Remove(key, out value))
throw new Exception();
return value;
}
public bool TryRemove(TKey key)
{
lock (_locker)
{
var copiedList = _queue.ToList();
if (copiedList.Remove(key))
return false;
_queue = new(copiedList);
}
return _dictionary.TryRemove(key, out _);
}
}
but that will require a Lock on Remove(T) because it demands a full deep copy of the initial Queue without the removed item while disallowing read from other threads, which means that at least Remove() will also have this lock, and this is meant to be an operation carried out often;
internal class ConcurrentQueuedDictionary<TKey, TValue> where TKey : notnull
{
ConcurrentDictionary<TKey, TValue> _dictionary;
ConcurrentDictionary<int, TKey> _order;
int _addOrder = 0;
int _removeOrder = 0;
public bool TryAdd(TKey key, TValue value)
{
if (!_dictionary.TryAdd(key, value))
return false;
if (!_order.TryAdd(unchecked(Interlocked.Increment(ref _addOrder)), key))
throw new Exception(); //Operation faulted, mismatch of data in _order
return true;
}
public TValue TryRemove()
{
TKey key;
if (!(_order.Count > 0 && _order.Remove(unchecked(Interlocked.Increment(ref _removeOrder)), out key)))
return default(TValue);
return _dictionary[key];
}
public bool TryRemove(TKey key)
{
if (!_order.Remove(_order.Where(item => item.Value.Equals(key)).First().Key, out _))
return false;
if (!_dictionary.Remove(key, out _))
throw new Exception();
return true;
}
}
but I'm pretty sure just voicing this implementation had put me on a psychiatric watchlist somewhere because it's gonna be a masochistic nightmare to make work properly;
Any ideas? I'm kinda stumped by this issue as I don't have the best grasp on concurrent collections. Do I need a custom IProducerConsumerCollection? Is it even possible to have both random (or queued) and specific access to concurrent collection elements? Have any of you faced this before, maybe I'm looking at the issue wrong?
Edit: typos, formatting
Upvotes: 2
Views: 684
Reputation: 43555
Creating a concurrent structure like this by combining built-in concurrent collections should be close to impossible, provided of course that correctness is paramount and race-conditions are strictly forbidden. The good news is that acquiring a lock
a few thousands times per second is nowhere near the limit where contention starts to become an issue, provided that the operations inside the protected region are lightweight (their duration is measured in nanoseconds).
One way to achieve O(1) complexity of operations, is to combine a LinkedList<T>
and a Dictionary<K,V>
:
/// <summary>
/// Represents a thread-safe first-in-first-out (FIFO) collection of key/value pairs,
/// where the key is unique.
/// </summary>
public class ConcurrentKeyedQueue<TKey, TValue>
{
private readonly LinkedList<KeyValuePair<TKey, TValue>> _queue;
private readonly Dictionary<TKey, LinkedListNode<KeyValuePair<TKey, TValue>>>
_dictionary;
public ConcurrentKeyedQueue(IEqualityComparer<TKey> comparer = default)
{
_queue = new();
_dictionary = new(comparer);
}
public int Count { get { lock (_queue) return _queue.Count; } }
public bool TryEnqueue(TKey key, TValue value)
{
lock (_queue)
{
ref var node = ref CollectionsMarshal
.GetValueRefOrAddDefault(_dictionary, key, out bool exists);
if (exists) return false;
node = new(new(key, value));
_queue.AddLast(node);
Debug.Assert(_queue.Count == _dictionary.Count);
return true;
}
}
public bool TryDequeue(out TKey key, out TValue value)
{
lock (_queue)
{
if (_queue.Count == 0) { key = default; value = default; return false; }
var node = _queue.First;
(key, value) = node.Value;
_queue.RemoveFirst();
bool removed = _dictionary.Remove(key);
Debug.Assert(removed);
Debug.Assert(_queue.Count == _dictionary.Count);
return true;
}
}
public bool TryTake(TKey key, out TValue value)
{
lock (_queue)
{
bool removed = _dictionary.Remove(key, out var node);
if (!removed) { value = default; return false; }
_queue.Remove(node);
(_, value) = node.Value;
Debug.Assert(_queue.Count == _dictionary.Count);
return true;
}
}
public KeyValuePair<TKey, TValue>[] ToArray()
{
lock (_queue) return _queue.ToArray();
}
}
This combination is also used for creating LRU caches.
You can measure the lock
contention in your own environment under load, by using the Monitor.LockContentionCount
property: "Gets the number of times there was contention when trying to take the monitor's lock." If you see the delta per second to be a single digit number, there is nothing to worry about.
For a version that doesn't use the CollectionsMarshal.GetValueRefOrAddDefault
method, and so it can be used on .NET versions older than .NET 6, see the first revision of this answer.
Upvotes: 2