Reputation: 502
Basically I'm trying to be able to rate limit the execution of iterations of a list.
I really like the idea of using RX as I can build off the top of it, and have a more elegant solution, but it wouldn't have to be done using RX.
I've formulated this with the help of many much smarter than I. My problem is that I'd like to be able to say someCollection.RateLimitedForEach(rate, function), and have it ultimately block until we're done processing... or have it be an async method.
The demo below the function, works in a console app, but if I close after the foreach, it immediately returns.
I'm just kind of at a loss whether this is fixable, or if I should go about it completely different
public static void RateLimitedForEach<T>(this List<T> list, double minumumDelay, Action<T> action)
{
list.ToObservable().Zip(Observable.Interval(TimeSpan.FromSeconds(minumumDelay)), (v, _) => v)
.Do(action).Subscribe();
}
//rate limits iteration of foreach... keep in mind this is not the same thing as just sleeping for a second
//between each iteration, this is saying at the start of the next iteration, if minimum delay time hasnt past, hold until it has
var maxRequestsPerMinute = 60;
requests.RateLimitedForeach(60/maxRequestsPerMinute,(request) => SendRequest(request));
Upvotes: 0
Views: 1118
Reputation: 117175
Your code was just about perfect.
Try this instead:
public static void RateLimitedForEach<T>(this List<T> list, double minumumDelay, Action<T> action)
{
list
.ToObservable()
.Zip(Observable.Interval(TimeSpan.FromSeconds(minumumDelay)), (v, _) => v)
.Do(action)
.ToArray()
.Wait();
}
Upvotes: 0
Reputation: 40335
The concept that you need to get accross, is that the main thread is not waiting on your RateLimitedForEach
call to complete. Also - on your console app - as soon as the main thread ends, the process ends.
What does that mean? It means that the process will end regardless of whatever or not the observer on RateLimitedForEach
has finished executing.
Note: The user may still force the execution of you app to finish, and that is a good thing. You may use a form app if you want to be able to wait without hanging the UI, you may use a service if you don't want the user closing windows related to the process.
Using Task is a superios solution to what I present below.
Notice that when using Tasks on the console app, you still need to wait on the task to prevent the main thread to finish before RateLimitedForEach
completed its job. Moving away from a console app is still advised.
If you insist in continuing using your code, you can tweak it for it to hang the calling thread until completion:
public static void RateLimitedForEach<T>
(
this List<T> list,
double minumumDelay,
Action<T> action
)
{
using (var waitHandle = new ManualResetEventSlim(false))
{
var mainObservable = list.ToObservable();
var intervalObservable = Observable.Interval(TimeSpan.FromSeconds(minumumDelay));
var zipObservable = mainObservable .Zip(intervalObservable, (v, _) => v);
zipObservable.Subscribe
(
action,
error => GC.KeepAlive(error), // Ingoring them, as you already were
() => waitHandle.Set() // <-- "Done signal"
);
waitHandle.Wait(); // <--- Waiting on the observer to complete
}
}
Upvotes: 0
Reputation: 503
Does RX Throttle not do what you want?
https://msdn.microsoft.com/en-us/library/hh229400(v=vs.103).aspx
Upvotes: -1
Reputation: 27871
but it wouldn't have to be done using RX
Here is how you can do it synchronously:
public static void RateLimitedForEach<T>(
this List<T> list,
double minumumDelay,
Action<T> action)
{
foreach (var item in list)
{
Stopwatch sw = Stopwatch.StartNew();
action(item);
double left = minumumDelay - sw.Elapsed.TotalSeconds;
if(left > 0)
Thread.Sleep(TimeSpan.FromSeconds(left));
}
}
And here is how you can do it asynchronously (only potential waits are asynchronous):
public static async Task RateLimitedForEachAsync<T>(
this List<T> list,
double minumumDelay,
Action<T> action)
{
foreach (var item in list)
{
Stopwatch sw = Stopwatch.StartNew();
action(item);
double left = minumumDelay - sw.Elapsed.TotalSeconds;
if (left > 0)
await Task.Delay(TimeSpan.FromSeconds(left));
}
}
Please note that you can change the asynchronous version to make the action it self asynchronous like this:
public static async Task RateLimitedForEachAsync<T>(
this List<T> list,
double minumumDelay,
Func<T,Task> async_task_func)
{
foreach (var item in list)
{
Stopwatch sw = Stopwatch.StartNew();
await async_task_func(item);
double left = minumumDelay - sw.Elapsed.TotalSeconds;
if (left > 0)
await Task.Delay(TimeSpan.FromSeconds(left));
}
}
This is helpful if the action you need to run on each item is asynchronous.
The last version can be used like this:
List<string> list = new List<string>();
list.Add("1");
list.Add("2");
var task = list.RateLimitedForEachAsync(1.0, async str =>
{
//Do something asynchronous here, e.g.:
await Task.Delay(500);
Console.WriteLine(DateTime.Now + ": " + str);
});
Now you should wait for task
to finish. If this is the Main
method, then you need to synchronously wait like this:
task.Wait();
On the other hand, if you are inside an asynchronous method, then you need to asynchronously wait like this:
await task;
Upvotes: 4