Reputation: 3567
I would like to transform my legacy event-based method to observable based, but I am quite new to Rx, so I am stuck now.
I have an event source, which is an observable by now. At a certain point in time, I have to start a method that ends either by returning the next element in the line or null if it is timed out.
The event-based approach looks like this:
public async Task<ReaderEvent> WaitForReaderAsync(int PlaceId, TimeSpan waitFor)
{
ReaderEvent result = null;
using (var cts = CancellationTokenSource.CreateLinkedTokenSource(new [] { topLevelToken }))
{
cts.CancelAfter(waitFor);
EventHandler<ReaderEvent> localHandler = (o, e) =>
{
if (e.PlaceId == PlaceId)
{
result = e;
cts.Cancel();
}
};
ReaderEventHandler += localHandler;
try
{
await Task.Delay(waitFor, cts.Token).ConfigureAwait(false);
}
catch (OperationCanceledException) { }
catch (Exception ex)
{
//...
}
ReaderEventHandler -= localHandler;
}
return result;
}
As you can see, the idea is that the delay is cancelled either by the arrival of the event I am waiting for or the token source is cancelled by configuration after that specific amount of time. Quite clean.
Now, the Rx version:
public async Task<ReaderEvent> WaitForReaderAsync(int PlaceId, TimeSpan waitFor)
{
ReaderEvent result = null;
var observable = _OnReaderEvent.FirstAsync(r => r.PlaceId == PlaceId);
using (var cts = CancellationTokenSource.CreateLinkedTokenSource(new [] { topLevelToken }))
{
cts.CancelAfter(waitFor);
using (observable.Subscribe(x => {
result = x;
cts.Cancel();
{
try
{
await Task.Delay(waitFor, cts.Token).ConfigureAwait(false);
}
catch (OperationCanceledException) { }
}
}
return result;
}
Not so clean... even worse... I have tried with Timeout extension too. But as this is a one-shot subscribtion, I still need waiting somehow before I dispose the subscription. The only difference would be that the OnError would cancel the local token, not the built-in mechanism of CancelAfter.
Is there any batter / more concise (more relying on Rx) way to do this?
Thank you!
Upvotes: 4
Views: 2618
Reputation: 43996
This problem can be solved in lots of different ways. Here is one, utilizing the Amb
, Return
, Delay
and FirstAsync
operators:
public Task<ReaderEvent> WaitForReaderAsync(int PlaceId, TimeSpan waitFor)
{
return _OnReaderEvent
.Where(r => r.PlaceId == PlaceId)
.Amb(Observable.Return(default(ReaderEvent)).Delay(waitFor))
.FirstAsync()
.ToTask();
}
In the abnormal case that the _OnReaderEvent
observable is completed or completes during the waiting, the resulting Task
will transition to a faulted state, with the exception being InvalidOperationException
"Sequence contains no elements".
Another implementation, functionaly equivalent to the previous one, using the Timeout
operator:
public Task<ReaderEvent> WaitForReaderAsync(int PlaceId, TimeSpan waitFor)
{
return _OnReaderEvent
.Where(r => r.PlaceId == PlaceId)
.FirstAsync()
.Timeout(waitFor, Observable.Return(default(ReaderEvent)))
.ToTask();
}
Upvotes: 1
Reputation: 117175
Why not just go with a simple Rx version of the code:
public async Task<ReaderEvent> WaitForReaderAsync(int PlaceId, TimeSpan waitFor)
{
return await
_OnReaderEvent
.Where(r => r.PlaceId == PlaceId)
.Buffer(waitFor, 1)
.Select(xs => xs.FirstOrDefault())
.FirstOrDefaultAsync()
.ToTask();
}
Upvotes: 1
Reputation: 2946
you could try with:
var values = await _OnReaderEvent
.Where(r => r.PlaceId == placeId)
.Buffer(waitFor, 1)
.FirstAsync(); // get list of matching elements during waitFor time
return values.FirstOrDefault(); // return first element or null if the list is empty
Upvotes: 3