Drake
Drake

Reputation: 2703

Call a function at a certain time with C# Reactive Extensions

Is it possible to call a function at a certain time, using Reactive Extensions?

For example, if I want to call method foo() at exactly 9am and 1pm, everyday, I could use the Timer class to check every few seconds if it's 9am or 1pm, or even the Observable.Interval function. But is there a more efficient way to do this? So I'm not checking every few seconds if it's time to call foo() yet, but rather an observable that will call foo() on its own at the right time.

Upvotes: 5

Views: 3135

Answers (1)

Dave Sexton
Dave Sexton

Reputation: 2652

Just use the overload of Timer that accepts a DateTimeOffset value. You can use Defer and Repeat to create an "absolute interval".

Observable.Defer(() => 
    DateTime.Now.Hour < 9
  ? Observable.Timer(DateTime.Today.AddHours(9))
  : DateTime.Now.Hour < 13
  ? Observable.Timer(DateTime.Today.AddHours(13))
  : Observable.Timer(DateTime.Today.AddDays(1).AddHours(9)))
          .Repeat()
          .Subscribe(...);

Rx automatically ensures, to the best of its ability, that your notification will occur on the exact date and time specified, even with regard to timer drift and if the system clock changes before the timer duration has elapsed.

Here's an extension method that generalizes the problem further.

Usage:

Observable2.Daily(TimeSpan.FromHours(9), TimeSpan.FromHours(13)).Subscribe(...);

Definition:

public static partial class Observable2
{
  public static IObservable<long> Daily(params TimeSpan[] times)
  {
    Contract.Requires(times != null);
    Contract.Requires(Contract.ForAll(times, time => time > TimeSpan.Zero));
    Contract.Requires(Contract.ForAll(times, time => time.TotalDays < 1));

    return Daily(Scheduler.Default, times);
  }

  public static IObservable<long> Daily(IScheduler scheduler, params TimeSpan[] times)
  {
    Contract.Requires(times != null);
    Contract.Requires(Contract.ForAll(times, time => time > TimeSpan.Zero));
    Contract.Requires(Contract.ForAll(times, time => time.TotalDays < 1));

    if (times.Length == 0)
      return Observable.Never<long>();

    // Do not sort in place.
    var sortedTimes = times.ToList();

    sortedTimes.Sort();

    return Observable.Defer(() =>
      {
        var now = DateTime.Now;

        var next = sortedTimes.FirstOrDefault(time => now.TimeOfDay < time);

        var date = next > TimeSpan.Zero
                 ? now.Date.Add(next)
                 : now.Date.AddDays(1).Add(sortedTimes[0]);

        Debug.WriteLine("Next @" + date + " from " + sortedTimes.Aggregate("", (s, t) => s + t + ", "));

        return Observable.Timer(date, scheduler);
      })
      .Repeat()
      .Scan(-1L, (n, _) => n + 1);
  }
}

Update:

If you want to be more "functional" in your approach by defining the input as an infinite sequence from an iterator block, per Jeroen Mostert's answer, then you can use Generate as follows.

Observable.Generate(
  GetScheduleTimes().GetEnumerator(), 
  e => e.MoveNext(), 
  e => e, 
  e => e.Current, 
  e => e.Current);

Jeroen Mostert's answer (deleted) provided an example implementation of GetScheduleTimes, but basically it was just an iterator block that yielded an infinite sequence of DateTimeOffset values in a while loop, with each loop incrementing the values' day by 1.

Upvotes: 7

Related Questions