Reputation: 2563
We have a source that notifies of changes to data, and when an item comes in we asynchronously fetch the new data.
source.SelectMany(async n => { await FetchData()});
While waiting to load the data many notifications may come in, but we want ignore all but 1, so that we don't go fetch data for every single notification, instead go only once more.
How can we ignore all notifications coming in from source, except 1, until the data is fetched?
I have a feeling the solution will involve converting the FetchData() to an IObservable, but I still don't know what Rx primitive would allow us to combine the streams.
Upvotes: 3
Views: 190
Reputation: 2962
Looks like a use case for a pretty classic (but missing) Rx operator: ObserveLatestOn
(sample implementation here but you can find others on the web).
source.ObserveLatestOn(TimeSpan.Zero, Schedulers.NewThread).SelectMany(async n => { await FetchData()})
Note that this implementation has only been tested on single threaded schedulers (UI mostly, but will work with NewThread), not with Immediate
/CurrentThread
(maybe works) or TaskPool
(likely has race conditions)
Note also that what you're hitting here is the lack of reactive pull backpressure in Rx.Net (in discussion here), RxJava has a nice backpressure support for such case (e.g. onBackpressureLatest)
Upvotes: 1
Reputation: 1389
I am sure there is a way to do it with Rx but a simple solution that comes to my mind is to use an AsyncAutoResetEvent (asynchronous version of an AutoResetEvent).
Basically, you create a loop that waits asynchronously for your AsyncAutoResetEvent to be set which is done when a new notification is received. The auto reset makes sure that on the next wait, you will be blocked asynchronously until a new notification is received.
You can find the AsyncAutoResetEvent class in the excellent library created by Stephen Cleary AsyncEx as a Nuget package.
Here is a simple program that shows the proposed solution in action:
class Program
{
static readonly AsyncAutoResetEvent _resetEvent = new AsyncAutoResetEvent();
static void Main(string[] args)
{
// Start the asynchronous fetching loop...
RunAsync();
Task.Run(async () =>
{
// Simulate fast notifications
for (int i = 0; i < 15; i++)
{
OnNotification(i);
await Task.Delay(100);
}
// Simulate a pause of notifications
await Task.Delay(2000);
// Simulate fast notifications
for (int i = 0; i < 15; i++)
{
OnNotification(i);
await Task.Delay(100);
}
});
Console.ReadKey();
}
static void OnNotification(int index)
{
Console.WriteLine(DateTime.Now.ToLongTimeString() + " OnNotification " + index);
// This will unlock the current or next WaitAsync on the _resetEvent
_resetEvent.Set();
}
static async Task RunAsync()
{
// Uncomment this if you want to wait for a first notification before fetching.
// await _resetEvent.WaitAsync();
while (true)
{
Console.WriteLine(DateTime.Now.ToLongTimeString() + " Fetching...");
// Simulate long fetching
await Task.Delay(1000);
// Wait for a new notification before doing another fetch
await _resetEvent.WaitAsync();
}
}
}
And here is the output:
12:04:51 PM Fetching...
12:04:51 PM OnNotification 0
12:04:52 PM OnNotification 1
12:04:52 PM OnNotification 2
12:04:52 PM OnNotification 3
12:04:52 PM OnNotification 4
12:04:52 PM OnNotification 5
12:04:52 PM OnNotification 6
12:04:52 PM OnNotification 7
12:04:52 PM OnNotification 8
12:04:52 PM OnNotification 9
12:04:52 PM Fetching...
12:04:53 PM OnNotification 10
12:04:53 PM OnNotification 11
12:04:53 PM OnNotification 12
12:04:53 PM OnNotification 13
12:04:53 PM OnNotification 14
12:04:53 PM Fetching...
12:04:55 PM OnNotification 0
12:04:55 PM Fetching...
12:04:55 PM OnNotification 1
12:04:55 PM OnNotification 2
12:04:55 PM OnNotification 3
12:04:56 PM OnNotification 4
12:04:56 PM OnNotification 5
12:04:56 PM OnNotification 6
12:04:56 PM OnNotification 7
12:04:56 PM OnNotification 8
12:04:56 PM OnNotification 9
12:04:56 PM Fetching...
12:04:56 PM OnNotification 10
12:04:56 PM OnNotification 11
12:04:56 PM OnNotification 12
12:04:57 PM OnNotification 13
12:04:57 PM OnNotification 14
12:04:57 PM Fetching...
Upvotes: 0