Bercovici Adrian
Bercovici Adrian

Reputation: 9360

How to use TaskCompletionSource

I am having a situation of multi-producer and single consumer.I have opted for a common thread-safe resource that in which all producers Enqueue items. However i do not know how to efficiently make the producer await for new items when Dequeue-ing from that resource.

POCO

struct Sample
    {
        public int Id { get; set; }
        public  double Value { get; set; }
    }

Producers

class ProducerGroup
    {
        StorageQueue sink;
        int producerGroupSize;

        public ProducerGroup(StorageQueue _sink,int producers)
        {
            this.sink = _sink;
            this.producerGroupSize = producers;
        }
        public void StartProducers()
        {
            Task[] producers = new Task[producerGroupSize];
            int currentProducer;
            for (int i = 0; i < producerGroupSize; i++)
            {
                currentProducer = i;
                producers[i] = Task.Run(async () =>
                  {
                      int cycle = 0;
                      while (true)
                      {
                          if (cycle > 5)
                          {
                              cycle = 0;
                          }
                          Sample localSample = new Sample { Id = currentProducer, Value = cycle++ };
                          await Task.Delay(1000);
                          this.sink.Enqueue(localSample);
                      }
                  });
            }

        }
    }

Storage

class StorageQueue
    {
        private TaskCompletionSource<Sample> tcs;
        private object @lock = new object();

        private Queue<Sample> queue;

        public static StorageQueue CreateQueue(int?size=null)
        {
            return new StorageQueue(size);
        }

        public StorageQueue(int ?size)
        {
            if (size.HasValue)
            {
                this.queue = new Queue<Sample>(size.Value);
            }
            else
                this.queue = new Queue<Sample>();
        }

        public void Enqueue(Sample value)
        {
            lock (this.@lock)
            {
                this.queue.Enqueue(value);
                tcs = new TaskCompletionSource<Sample>();
                tcs.SetResult(this.queue.Peek());
            }
        }
        public async Task<Sample> DequeueAsync()
        {
            var result=await this.tcs.Task;
            this.queue.Dequeue();
            return result;
        }
    }

Consumer

class Consumer
    {
        private StorageQueue source;
        public Consumer(StorageQueue _source)
        {
            this.source = _source;
        }
        public async Task ConsumeAsync()
        {
            while (true)
            {
                Sample arrivedSample = await this.source.DequeueAsync();  //how should i implement this ?
                Console.WriteLine("Just Arrived");
            }
        }
    }

As you can see in the Consumerclass i would like to wrap my Storage's methodDequeuein aTaskso that i canawaitit in myConsumer. The only reason i usedTaskCompletionSourceis to be able to communicate between theDequeueandEnqueuemethods in theStorage`.

I do not know if i need to reinitialize the tcs but i suppose i do since i want a new Task after every Enqueue operation.

I also reinitialized the tcs inside the lock since i want that particular instance to set the result.

How should i proceed with this? Is the implementation ok ? Would System.Reactive offer a better option ?

Upvotes: 0

Views: 347

Answers (1)

Nick
Nick

Reputation: 5042

I think I see several issues with your implementation:

  1. If you call Enqueue() multiple times without calling DequeueAsync(), you will lose the TaskCompletionSource and have only the last one. Then, calling DequeueAsync() will not yield correct results after the first call.

To fix this, you will need a queue of TaskCompletionSource's for this. Take a look here and further here.

Better, use SemaphoreSlim.WaitAsync() in DequeueAsync to wait properly if the queue is empty.

  1. You also don't protect your queue in DequeueAsync().

Upvotes: 1

Related Questions