Reputation: 51
How to transform the following callback-driven code to async/await pattern PROPERLY:
public class DeviceWrapper
{
// external device which provides real time stream of data
private InternalDevice device = new InternalDevice();
private List<int> accumulationBuffer = new List<int>();
public void StartReceiving()
{
// the following callback invocations might by synchronized by main
// UI message pump, particular window message pump
// or some other way
device.Synchronization = Synchronization.UI;
device.DataAvailable += DataAvailableHandler;
device.ReceivingStoppedOrErrorOccured += StopHandler;
device.Start();
}
private void DataAvailableHandler(object sender, DataEventArgs e)
{
// Filter data from e.Data and accumulate to accumulationBuffer field.
// If certail condition is met, signal pending task (if there is any)
//as complete return to the awaiting caller accumulationBuffer or perhaps temporary buffer created from accumulationBuffer
// in order to make it available to the caller.
// Handle also requested cancellation.
}
public Task<byte[]> GetData(CancellationToken token)
{
// create task returning data filtered and accumulated in DataAvailableHandler
}
}
// usage:
async void Test()
{
DeviceWrapper w = new DeviceWrapper();
w.StartReceiving();
while(true)
{
byte[] filteredData = await w.GetData(CancellationToken.Null);
Use(filteredData);
}
}
I have sought inspiration to solve this by reading .NET StreamReader class source, but it made me even more confused.
Thank you experts for any advice!
Upvotes: 2
Views: 1050
Reputation: 30474
If you use your async await properly your code would be much easier:
First of all:
<TResult
> instead of TResultNow implementing your example. There are several methods to solve this, but I think this typically is a producer - consumer pattern: we have an object that produces things in a tempo independant from another object that consumes them.
You can create this yourself, using semaphores to signal new data, but .NET already has something for this:
System.Threading.Tasks.DataFlow.BufferBlock.
You'll need to download a microsoft nuget package. See the remarks in MSDN description of BufferBlock.
A BufferBlock is something you send objects of type T to, while another task waits for objects of type T to arrive. Fully supports async / await.
Sender side:
<T
> where T is the type it sends.<T
> as property.Consumer side:
<T
> implements als ISourceBlock<T
>Ok, lets put it all together:
public class DeviceWrapper
{
// external device which provides real time stream of data
private InternalDevice device = new InternalDevice();
// internal buffer replaced by the bufferBlock
BufferBlock<byte> bufferBlock = new BufferBlock<byte>()
public void StartReceiving() {...}
private async void DataAvailableHandler(object sender, DataEventArgs e)
{
// get the input and convert it to a byte
// post the byte to the buffer block asynchronously
byte byteToSend = ...
await this.bufferBlock.SendAsync(byteToSend);
}
public async Task<IEnumerable<byte>> GetData(CancellationToken token)
{
List<byte> receivedBytes = new List<byte>()
while (await this.BufferBlock.OutputAvailableAsync(token))
{ // a byte is available
byte b = await this.bufferBlock.ReceiveAsync(token);
receivedBytes.Add(b);
if (receivedBytes.Count > ...)
{
return receivedBytes;
}
// else: not enough bytes received yet, wait for more
}
}
}
async Task Test(CancellationToken token)
{
DeviceWrapper w = new DeviceWrapper();
w.StartReceiving();
while(NoStopRequested)
{
token.ThrowIfCancellationrequested();
var filteredData = await w.GetData(token);
Use(filteredData);
}
}
There is a lot more to tell with BufferBlocks, especially on how to stop them neatly if no data is available anymore MSDN has several examples about this. See the chapter about DataFlow in parallel library
https://msdn.microsoft.com/en-us/library/hh228603(v=vs.110).aspx
Upvotes: 1
Reputation: 149558
You're looking for TaskCompletionSource<byte[]>
. This is an approximation of what it would look like:
public Task<byte[]> GetData(CancellationToken token)
{
cancellationToken.ThrowIfCancellationRequested;
var tcs = new TaskCompletionSource<byte[]>();
DataEventHandler dataHandler = null;
dataHandler = (o, e) =>
{
device.DataAvailable -= dataHandler;
tcs.SetResult(e.Data);
}
StopEventHandler stopHandler = null;
stopHandler = (os, se) =>
{
device.ReceivingStoppedOrErrorOccured -= stopHandler;
// Assuming stop handler has some sort of error property.
tcs.SetException(se.Exception);
}
device.DataAvailable += dataHandler;
device.ReceivingStoppedOrErrorOccured += stopHandler;
device.Start();
return tcs.Task;
}
Upvotes: 3