Reputation: 125
I have an IObserver class that writes packets to a stream and waits for the correct response, however I am not happy with part of the code:
bool ResponseReceived = false;
public async Task<IResponse> WriteAsync(Stream stream, bool returnResponse = false, bool flush = true, CancellationToken token = default(CancellationToken))
{
if (returnResponse)
{
//subscribe to IObserveable
PacketRouter router = new PacketRouter();
Subscribe(router);
//write the packet to the stream
await base.WriteAsync(stream, flush, token);
//I dont like the way this is done, is it possible to use task.WhenAny or WhenAll or even something I havent tried
if (!ResponseReceived)
{
var ts = TimeSpan.FromSeconds(Timeout);
DateTime maximumTime = DateTime.Now + ts;
while (!ResponseReceived && DateTime.Now < maximumTime)
{
await Task.Delay(10);
}
}
//Unsubscribe when the correct type of packet has been received or it has timed out
Unsubscribe();
}
else
{
//we dont need the return packet so we will just write to the stream and exit
await base.WriteAsync(stream, flush, token);
}
//return the response packet
return ResponseData;
}
public virtual void OnNext(Packet packet)
{
//when a packet is received, validate it
if (ValidResponse(packet))
{
//if valid set the response data
ResponseData.Payload = packet.Payload;
ResponseReceived = true; //The right to return the response is set here
}
}
I have tried using TaskCompletionResult and Task.WaitAny(responseReceived, TaskDelay(ts)); but I couldn't get it to work either. Is there a a better way to do this?!?
Updated with a little more context:
The Write class does not read a packet. A separate class (PacketHandler) does this and then passes it to an IObservable Class for dissemination to any class that wishes to listen. The reason for this is broadcast messages are also received which may come between the request and the response, also other packets maybe waiting for a response (although this should never technically happen).
Upvotes: 1
Views: 369
Reputation: 29981
You can directly await an observable, like so:
var router = new PacketRouter();
// write the packet to the stream
await base.WriteAsync(stream, flush, token);
try
{
// await the observable PacketRouter.
Packet p = await router
.FirstAsync()
.Timeout(DateTime.Now.AddSeconds(Timeout));
}
catch(TimeoutException)
{
// ...
}
Upvotes: 2