tjw
tjw

Reputation: 93

Processing values from a Thread Executing a yield return Function

I am new to multi threading and I am trying make an enumerator in a way that I receive a value as it is ready, but that it continues to process in the background.

I want to be able to use the final class like this:

 while (await asyncEnumerator.MoveNextAsync())
 {
       T result = asyncEnum.Current;
       //Do Something
 }

But I want it to continue getting the next result while I am doing stuff in the while loop.

Comment: It can't be processed in parallel since each result depends on the previous result.

I thought of maybe having a queue with the results and having a thread work on getting the results while returning the first result in the queue but I can't get this to work. This is my try, but I don't know what to do MoveNext():

I Updated The Code

public class AsyncEnumerator<T>
{
    private object _sync = new object();

    private readonly Queue<T> _queue;
    private readonly IEnumerable<T> _enumerable;

    public AsyncEnumerator(IEnumerable<T> enumerable)
    {
        _enumerable = enumerable;
        _queue = new Queue<T>();
    }

    /// <summary>
    /// Start getting results when requested the firstTime
    /// </summary>
    private bool _startFlag;

    public async Task<bool> MoveNextAsync()
    {
        if (_finished)
        {
            return false;
        }
        if (!_startFlag)
        {
            _startFlag = true;
            SetResultsAsync();
        }
        if (_queue.Count > 0)
        {
            Current = _queue.Dequeue();
            return true;
        }
        //What here?
        return await Task.Run(()=> true);
    }

    private T _current;
    public T Current
    {
        get
        {
            lock (_sync)
            {
                return _current;
            }
        }
        private set { _current = value; }
    }

    private bool _finished;

    private async Task SetResultsAsync()
    {
        IEnumerator<T> enumerator = _enumerable.GetEnumerator();
        bool moveNext = await Task.Run(() => enumerator.MoveNext());
        while (moveNext)
        {
            _queue.Enqueue(enumerator.Current);
            moveNext = await Task.Run(() => enumerator.MoveNext());
        }
        _finished = true;
    }
}

Obviously if there is a better way to do this I would like to know, I am new to multi threading.

Upvotes: 2

Views: 364

Answers (1)

Stephen Cleary
Stephen Cleary

Reputation: 457017

But I want it to continue getting the next result while I am doing stuff in the while loop.

A plain async enumerator is the wrong solution here. Enumerators are a "pull" technology; they only do things when the consuming code requests the next item.

Think about your problem in terms of a "producer" and a "consumer". If the producer only does work when the consumer requests it to, then you have an enumerator scenario. In your case, you want the producer to be independently producing items, so an enumerator is not going to fit properly.

One option is reactive extensions; an observable represents a sequence of values similar to an enumerator, except that observables are a "push" technology. So the producer will push values to the consumer, which reacts to the new values coming in (hence the name Reactive).

It sounds like Rx would be a great fit for you, but it does have a fairly high learning curve.

A simpler option may be to use a more traditional producer/consumer queue. Since you want to consume it asynchronously, you're going to want an async-compatible producer/consumer queue. BufferBlock<T> is one solution, as is the AsyncProducerConsumerQueue<T> from my AsyncEx library.

In both cases, you'd kick off the producer and then consume:

var queue = new BufferBlock<T>();

// Kick off producer
var producer = Task.Run(() =>
{
  while (...)
  {
    T value = ...;
    queue.Post(value);
  }
  queue.Complete();
});

// Consumer code
while (await queue.OutputAvailableAsync())
{
  var value = await queue.ReceiveAsync();
  ...
}

Eventually, you will want to await that producer task, so that you can properly detect any exceptions from your producer code.

Upvotes: 4

Related Questions