Reputation: 15047
I've got a synchronisation question where I have a thread that generates data and puts the data in a list. I've got other threads that require to either sample "current latest value" or "wait for new data, and then get latest value".
My class that generates data roughly looks like this:
public class GeneratingThread
{
public delegate void Callback();
public event EventHandler DataChanged;
private List<int> dataPayload = new List<int>();
/* other functions */
public int GetLastData()
{
/* synchronization */
return dataPayload.Last();
}
private void AsyncDataSampler(int data) /* Generates data in intervals of ~1ms */
{
/* synchronization */
dataPayload.Add(data);
if (DataChanged != null)
DataChanged(this, EventArgs.Empty);
}
}
And the class that consumes data looks like this:
public class ConsumingThread
{
int WaitForMeasurement(AsyncThread otherThread)
{
AutoResetEvent autoResetEvent = new AutoResetEvent(false);
EventHandler waitForEvent = (Object sender, EventArgs args) => autoResetEvent.Set();
otherThread.DataChanged += waitForEvent;
autoResetEvent.WaitOne();
otherThread.DataChanged -= waitForEvent;
/* do something with the data */
return otherThread.GetLastData();
}
}
EventHandler waitForEvent
to and from the otherThread.DataChanged
handler? Or does the object, when deleted/cleanedup remove itself from the Event list?C#
kind of way to do these things?(note, my background is in Embedded C++ and know some C#) So forgive me if the C# looks kind of bad. Here to learn ;-)
Upvotes: 3
Views: 424
Reputation: 21936
Based on the comments, I would do something like this:
/// <summary>This class is thread safe, you can call methods from any thread.</summary>
class Measures
{
readonly object syncRoot = new object();
int? lastValue = null;
readonly List<int> buffer = new List<int>();
/// <summary>Provide a new value</summary>
public void produce( int val )
{
lock( syncRoot )
{
lastValue = val;
buffer.Add( val );
Monitor.PulseAll( syncRoot );
}
}
/// <summary>Get a single last value, or null if there's none currently.</summary>
public int? getLast()
{
lock( syncRoot )
return lastValue;
}
/// <summary>Block the calling thread waiting for the next result.</summary>
public int getNext()
{
lock( syncRoot )
{
Monitor.Wait( syncRoot );
return lastValue.Value;
}
}
/// <summary>Get all measures accumulated so far.</summary>
public List<int> getAll()
{
lock( syncRoot )
{
List<int> result = new List<int>( buffer.Count );
result.AddRange( buffer );
return result;
}
}
}
The Monitor class (it’s the same one lock
statement compiles into) is implemented in .NET runtime, often faster than events.
If you have many consuming threads, more than CPU cores, you probably don’t want to block threads. Here's a non-blocking version:
/// <summary>This class is thread safe, you can call methods from any threads.</summary>
class Measures
{
readonly object syncRoot = new object();
int? lastValue = null;
TaskCompletionSource<int> tcsConsumeNext = null;
readonly List<int> buffer = new List<int>();
/// <summary>Provide a new value</summary>
public void produce( int val )
{
TaskCompletionSource<int> tcs;
lock( syncRoot )
{
lastValue = val;
buffer.Add( val );
tcs = tcsConsumeNext;
tcsConsumeNext = null;
}
tcs?.TrySetResult( val );
}
/// <summary>Get a single last value, or null if there's none.</summary>
public int? getLast()
{
lock( syncRoot )
return lastValue;
}
/// <summary>Get a task which will complete with the next value.</summary>
public Task<int> getNext()
{
lock( syncRoot )
{
if( null == tcsConsumeNext )
{
// Some setup is required to make sure the producing thread doesn't run the continuations.
tcsConsumeNext = new TaskCompletionSource<int>( TaskCreationOptions.RunContinuationsAsynchronously );
}
return tcsConsumeNext.Task;
}
}
/// <summary>Get all measures accumulated so far.</summary>
public List<int> getAll()
{
lock( syncRoot )
{
List<int> result = new List<int>( buffer.Count );
result.AddRange( buffer );
return result;
}
}
}
Upvotes: 0
Reputation: 1062745
Since (comments) you actually need all the data, not just the latest; this seems like a perfect fit for Channel<T>
; Channel<T>
is explicitly designed for fully asynchronous (async
/await
) producer/consumer scenarios, and has configuration options for bounded vs unbounded, threading models, multiple vs single producer/consumer, and what to do when a bounded channel is full (delay, drop old, drop new, etc).
So basically: consider using Channel<T>
; it is available on nuget, including support down to netstandard1.3 and net46: https://www.nuget.org/packages/System.Threading.Channels/
A good usage overview is available here: https://www.stevejgordon.co.uk/an-introduction-to-system-threading-channels
Upvotes: 2