Andrew
Andrew

Reputation: 1621

How to implement queue conflation

Suppose I have a BlockingCollection queue and I am adding the following events:

A -> B1 -> C -> B2 -> B1

I basically only care about the last B1 event. I want to be able to discard the other B1's in front of it in this scenario (but still process B2 since it is using a different ID value than B1). It seems like with a BlockingCollection I cannot achieve this unless I can control when B1 or B2 gets added each time which I do not know when another B will be added.

I thought about creating a separate data structure containing the B events (it would be a ConcurrentDictionary where the key type is an ID value - if 2 B events with the same ID value are added one after the other, than the first one would effectively be discarded because it would be overwritten in the dictionary). The problem with this is that I lose the ordering of all events which is important. I still want events to be processed in the order shown above.

Any ideas?

Upvotes: 1

Views: 916

Answers (2)

Darragh
Darragh

Reputation: 2656

class ConflatingQueue<TKey, TValue> : IEnumerable<TValue>
{
    private readonly Dictionary<TKey, TValue> dict = new Dictionary<TKey, TValue>();
    private readonly Queue<TKey> keys = new Queue<TKey>();

    public void Enqueue(TKey key, TValue value)
    {
        if (dict.ContainsKey(key))
        {
            dict[key] = value;
        }
        else
        {
            dict.Add(key, value);
            keys.Enqueue(key);
        }
    }

    public TValue Dequeue()
    {
        var key = keys.Dequeue();
        var value = dict[key];
        dict.Remove(key);
        return value;
    }

    public IEnumerator<TValue> GetEnumerator()
    {
        foreach (var key in keys)
        {
            yield return dict[key];
        }
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }
}

Which could be used like this:

    public static void Main(string[] args)
    {
        //A -> B1 -> C -> B2 -> B1
        var cq = new ConflatingQueue<string, string>();
        cq.Enqueue("A", "A");
        cq.Enqueue("B1", "B1");
        cq.Enqueue("C", "C");
        cq.Enqueue("B2", "B2");
        cq.Enqueue("B1", "B1");

        Console.WriteLine(string.Join(",", cq)); //A,B1,C,B2
    }

I'll leave making it multi-threaded as an exercise for the reader.

Upvotes: 3

Robert
Robert

Reputation: 371

Based on your comments, here are a few options:

  • Implement counters or flags such that only a single instance of a B event type can exist on the queue.
  • Set up a "B event" dictionary such that your key is the id of the B event and the value is the reference to the corresponding queue node. If a new B event enters the queue, do a lookup in the dictionary. If one already exists of that type, just swap out the queue node for a new one with the new B event or simply point the node's reference to the new B event instance. Since you're only changing the queue node belonging to the old B event instance, you won't lose the ordering of other items.

Upvotes: 0

Related Questions