Rishabh Jain
Rishabh Jain

Reputation: 634

IgniteQueue in Apache Ignite.NET

We are using Ignite.NET and don't have option to use Ignite Java API (team skills, technology affinity etc). We are looking to create a queuing mechanism so that we could process messages in distributed fashion. I found IgniteQueue data structure to be most suitable but it doesn't seem to be available in ignite.net could someone please suggest a solution to the scenario. Multiple producers queue a unique work item to be processed reliably by only 1 consumer at a time.

E.g. there are P1,P2 producers (on different machines) they generate T1,T2,T3 on the queue and we have C1,C2,C3 consumers (on different machines) now T1 should be processed by ONLY 1 from C1,C2,C3 and so on for T2,T3 should also similarly be processed only once by 1 consumer

Upvotes: 1

Views: 396

Answers (1)

Pavel Tupitsyn
Pavel Tupitsyn

Reputation: 8986

IgniteQueue is built on top of Ignite Cache, so yes, you can replicate the same functionality in .NET:

  1. Create a cache
  2. Use Continuous Query as a consumer, call ICache.Remove to ensure that every item is processed only once
  3. Add data to cache on producers with Data Streamers or just use ICache.Put / PutAll

Below is the code for continuous query listener:

class CacheEventListener<TK, TV> : ICacheEntryEventListener<TK, TV>
{
    private readonly string _cacheName;

    [InstanceResource]  // Injected automatically.
    private readonly IIgnite _ignite = null;

    private ICache<TK, TV> _cache;

    public CacheEventListener(string cacheName)
    {
        _cacheName = cacheName;
    }

    public void OnEvent(IEnumerable<ICacheEntryEvent<TK, TV>> events)
    {
        _cache = _cache ?? _ignite.GetCache<TK, TV>(_cacheName);

        foreach (var entryEvent in events)
        {
            if (entryEvent.EventType == CacheEntryEventType.Created && _cache.Remove(entryEvent.Key))
            {
                // Run consumer logic here - use another thread for heavy processing.
                Consume(entryEvent.Value);
            }
        }
    }
}

Then we deploy this to every node with a single call:

var consumer = new CacheEventListener<Guid, string>(cache.Name);
var continuousQuery = new ContinuousQuery<Guid, string>(consumer);
cache.QueryContinuous(continuousQuery);

As a result, OnEvent is called once per entry on the primary node for that entry. So there is one consumer per Ignite node. We can increase effective number of consumers per node by offloading actual consumer logic to other threads, using BlockingCollection and so on.

And one last thing - we have to come up with a unique cache key for every new entry. Simplest thing is Guid.NewGuid(), but we can also use AtomicSequence.

Upvotes: 1

Related Questions