Daan Timmer
Daan Timmer

Reputation: 15047

Asynchronous waiting for data changed

I've got a synchronisation question where I have a thread that generates data and puts the data in a list. I've got other threads that require to either sample "current latest value" or "wait for new data, and then get latest value".

My class that generates data roughly looks like this:

    public class GeneratingThread
    {
        public delegate void Callback();

        public event EventHandler DataChanged;

        private List<int> dataPayload = new List<int>();

        /* other functions */

        public int GetLastData()
        {
            /* synchronization */
            return dataPayload.Last();
        }

        private void AsyncDataSampler(int data) /* Generates data in intervals of ~1ms */
        {
            /* synchronization */
            dataPayload.Add(data);

            if (DataChanged != null)
                DataChanged(this, EventArgs.Empty);
        }
    }

And the class that consumes data looks like this:

    public class ConsumingThread
    {
        int WaitForMeasurement(AsyncThread otherThread)
        {
            AutoResetEvent autoResetEvent = new AutoResetEvent(false);

            EventHandler waitForEvent = (Object sender, EventArgs args) => autoResetEvent.Set();

            otherThread.DataChanged += waitForEvent;

            autoResetEvent.WaitOne();

            otherThread.DataChanged -= waitForEvent;

            /* do something with the data */
            return otherThread.GetLastData();
        }
    }

(note, my background is in Embedded C++ and know some C#) So forgive me if the C# looks kind of bad. Here to learn ;-)

Upvotes: 3

Views: 424

Answers (2)

Soonts
Soonts

Reputation: 21936

Based on the comments, I would do something like this:

/// <summary>This class is thread safe, you can call methods from any thread.</summary>
class Measures
{
    readonly object syncRoot = new object();
    int? lastValue = null;
    readonly List<int> buffer = new List<int>();

    /// <summary>Provide a new value</summary>
    public void produce( int val )
    {
        lock( syncRoot )
        {
            lastValue = val;
            buffer.Add( val );
            Monitor.PulseAll( syncRoot );
        }
    }

    /// <summary>Get a single last value, or null if there's none currently.</summary>
    public int? getLast()
    {
        lock( syncRoot )
            return lastValue;
    }

    /// <summary>Block the calling thread waiting for the next result.</summary>
    public int getNext()
    {
        lock( syncRoot )
        {
            Monitor.Wait( syncRoot );
            return lastValue.Value;
        }
    }

    /// <summary>Get all measures accumulated so far.</summary>
    public List<int> getAll()
    {
        lock( syncRoot )
        {
            List<int> result = new List<int>( buffer.Count );
            result.AddRange( buffer );
            return result;
        }
    }
}

The Monitor class (it’s the same one lock statement compiles into) is implemented in .NET runtime, often faster than events.


If you have many consuming threads, more than CPU cores, you probably don’t want to block threads. Here's a non-blocking version:

/// <summary>This class is thread safe, you can call methods from any threads.</summary>
class Measures
{
    readonly object syncRoot = new object();
    int? lastValue = null;
    TaskCompletionSource<int> tcsConsumeNext = null;
    readonly List<int> buffer = new List<int>();

    /// <summary>Provide a new value</summary>
    public void produce( int val )
    {
        TaskCompletionSource<int> tcs;
        lock( syncRoot )
        {
            lastValue = val;
            buffer.Add( val );
            tcs = tcsConsumeNext;
            tcsConsumeNext = null;
        }
        tcs?.TrySetResult( val );
    }

    /// <summary>Get a single last value, or null if there's none.</summary>
    public int? getLast()
    {
        lock( syncRoot )
            return lastValue;
    }

    /// <summary>Get a task which will complete with the next value.</summary>
    public Task<int> getNext()
    {
        lock( syncRoot )
        {
            if( null == tcsConsumeNext )
            {
                // Some setup is required to make sure the producing thread doesn't run the continuations.
                tcsConsumeNext = new TaskCompletionSource<int>( TaskCreationOptions.RunContinuationsAsynchronously );
            }
            return tcsConsumeNext.Task;
        }
    }

    /// <summary>Get all measures accumulated so far.</summary>
    public List<int> getAll()
    {
        lock( syncRoot )
        {
            List<int> result = new List<int>( buffer.Count );
            result.AddRange( buffer );
            return result;
        }
    }
}

Upvotes: 0

Marc Gravell
Marc Gravell

Reputation: 1062745

Since (comments) you actually need all the data, not just the latest; this seems like a perfect fit for Channel<T>; Channel<T> is explicitly designed for fully asynchronous (async/await) producer/consumer scenarios, and has configuration options for bounded vs unbounded, threading models, multiple vs single producer/consumer, and what to do when a bounded channel is full (delay, drop old, drop new, etc).

So basically: consider using Channel<T>; it is available on nuget, including support down to netstandard1.3 and net46: https://www.nuget.org/packages/System.Threading.Channels/

A good usage overview is available here: https://www.stevejgordon.co.uk/an-introduction-to-system-threading-channels

Upvotes: 2

Related Questions