Dog Ears
Dog Ears

Reputation: 10035

Ensure a long running task is only fired once and subsequent request are queued but with only one entry in the queue

I have a compute intensive method Calculate that may run for a few seconds, requests come from multiple threads.

Only one Calculate should be executing, a subsequent request should be queued until the initial request completes. If there is already a request queued then the the subsequent request can be discarded (as the queued request will be sufficient)

There seems to be lots of potential solutions but I just need the simplest.

UPDATE: Here's my rudimentaryattempt:

private int _queueStatus;
private readonly object _queueStatusSync = new Object();

public void Calculate()
{
    lock(_queueStatusSync)
    {
        if(_queueStatus == 2) return;
        _queueStatus++;
        if(_queueStatus == 2) return;
    }
    for(;;)
    {
        CalculateImpl();
        lock(_queueStatusSync)
            if(--_queueStatus == 0) return;

    }
}

private void CalculateImpl()
{
    // long running process will take a few seconds...
}

Upvotes: 5

Views: 844

Answers (3)

Yuval Itzchakov
Yuval Itzchakov

Reputation: 149656

It sounds like a classic producer-consumer. I'd recommend looking into BlockingCollection<T>. It is part of the System.Collection.Concurrent namespace. On top of that you can implement your queuing logic.

You may supply to a BlockingCollection any internal structure to hold its data, such as a ConcurrentBag<T>, ConcurrentQueue<T> etc. The latter is the default structure used.

Upvotes: 0

i3arnon
i3arnon

Reputation: 116676

The simplest, cleanest solution IMO is using TPL Dataflow (as always) with a BufferBlock acting as the queue. BufferBlock is thread-safe, supports async-await, and more important, has TryReceiveAll to get all the items at once. It also has OutputAvailableAsync so you can wait asynchronously for items to be posted to the buffer. When multiple requests are posted you simply take the last and forget about the rest:

var buffer = new BufferBlock<Request>();
var task = Task.Run(async () =>
{
    while (await buffer.OutputAvailableAsync())
    {
        IList<Request> requests;
        buffer.TryReceiveAll(out requests);
        Calculate(requests.Last());
    }
});

Usage:

buffer.Post(new Request());
buffer.Post(new Request());

Edit: If you don't have any input or output for the Calculate method you can simply use a boolean to act as a switch. If it's true you can turn it off and calculate, if it became true again while Calculate was running then calculate again:

public bool _shouldCalculate;

public void Producer()
{
    _shouldCalculate = true;
}

public async Task Consumer()
{
    while (true)
    {
        if (!_shouldCalculate)
        {
            await Task.Delay(1000);
        }
        else
        {
            _shouldCalculate = false;
            Calculate();

        }
    }
}

Upvotes: 4

paparazzo
paparazzo

Reputation: 45106

A BlockingCollection that only takes 1 at a time
The trick is to skip if there are any items in the collection

I would go with the answer from I3aron +1
This is (maybe) a BlockingCollection solution

public static void BC_AddTakeCompleteAdding()
{
    using (BlockingCollection<int> bc = new BlockingCollection<int>(1))
    {

        // Spin up a Task to populate the BlockingCollection  
        using (Task t1 = Task.Factory.StartNew(() =>
        {
            for (int i = 0; i < 100; i++)
            {
                if (bc.TryAdd(i))
                {
                    Debug.WriteLine("  add  " + i.ToString());
                }
                else
                {
                    Debug.WriteLine("  skip " + i.ToString());
                }

                Thread.Sleep(30);
            }
            bc.CompleteAdding();
        }))
        {

            // Spin up a Task to consume the BlockingCollection 
            using (Task t2 = Task.Factory.StartNew(() =>
            {
                try
                {
                    // Consume consume the BlockingCollection 
                    while (true)
                    {
                        Debug.WriteLine("take " + bc.Take());
                        Thread.Sleep(100);
                    }
                }
                catch (InvalidOperationException)
                {
                    // An InvalidOperationException means that Take() was called on a completed collection
                    Console.WriteLine("That's All!");
                }
            }))

                Task.WaitAll(t1, t2);
        }
    }
}

Upvotes: 0

Related Questions