Flack
Flack

Reputation: 5899

Reactive extension timer

I have a HashSet. Occasionlay, new values are added to this hashset. What I am trying to do is have a timer remove each element from the set exactly one minute after it was added.

I am still new to rx but this seems like an ideal occasion to use it.

I tried something like this:

AddItem(string item)
{
  _mySet.Add(item);
  var timer = Observable.Timer(TimeSpan.FromSeconds(60), _scheduler);
  timer
      .Take(1)
      .Do(item => RemoveItem(item))
      .Subscribe(_ => Console.WriteLine("Removed {0}", item));
}

It seems to work ok (passes unit tests).

Does anyone see anything wrong with this approach?

Upvotes: 3

Views: 5027

Answers (4)

Lee Campbell
Lee Campbell

Reputation: 10783

You dont need to create a sequence to do this. You are already being a good citizen and using a Scheduler explicity, so just use that!

You could just have this for your code

AddItem(string item)
{
  _mySet.Add(item);
  //Note this does return an IDisposable if you want to cancel the subscription.
  _scheduler.Schedule(
    TimeSpan.FromSeconds(60),
    ()=>
    { 
        RemoveItem(item);
        Console.WriteLine("Removed {0}", item);
    });
}

This basically means there is much less work going on under the covers. Consider all the work the Observable.Timer method is going, when effectively all you just want it to do is schedule an OnNext with a value (that you ignore).

I would also assume that even a user that doesn't know anything about Rx would be able to read this schedule code too. ie. "After I add this item, I schedule this remove action to run in 60 seconds).

Upvotes: 2

Alex
Alex

Reputation: 7919

  1. Your lambda in the Do call doesn't look right - Observable.Timer produces int values, but your collection is a HashSet<string> - this shouldn't compile. I'm guessing it was just a typo.

  2. Do: in general your subscription should be done in Subscribe. Do is intended for side effects (I dislike the idea of side-effects in a stream so I avoid it but it is useful for debugging).

  3. Take: Observable.Timer only produces one value before it terminates, thus there is no need for the Take operator

I would write your function as:

AddItem(string item)
{
    _mySet.Add(item);
    Observable.Timer(TimeSpan.FromSeconds(60), _scheduler)
        .Subscribe(_ => RemoveItem(item));
}

Upvotes: 3

JerKimball
JerKimball

Reputation: 16894

Sorry, don't mean to pick on you, but:

ALWAYS DISPOSE IDISPOSABLES!!!!!

(EDIT: Ok, not sure what the heck I put in my coffee this morning, but I answered with a whole mess of nonsense; I'll leave the above only because in general, you do want to make sure to dispose any IDisposable, but in an effort to make up for the babble that follows...)

That call to Subscribe creates a subscription that you are NOT disposing, so multiple calls to this method are just going to queue up more and more crap - now in this specific case, it's not the end of the world since the Timer only fires once, but still...Dispose!

If you really want to use this method (I think a better approach would be to have some running thread/task that "tends" to your values, removing when it thinks its neccesary), at least try something akin to:

Ok, ignore all that struck-out crap. The implementation of Observable.Timer is this:

public static IObservable<long> Timer(TimeSpan dueTime)
{
    return s_impl.Timer(dueTime);
}

which in turn calls into this:

public virtual IObservable<long> Timer(TimeSpan dueTime)
{
    return Timer_(dueTime, SchedulerDefaults.TimeBasedOperations);
}

which calls...

private static IObservable<long> Timer_(TimeSpan dueTime, IScheduler scheduler)
{
    return new Timer(dueTime, null, scheduler);
}

And here's where things get fun - Timer is a Producer<long>, where the meaty bits are:

private IDisposable InvokeStart(IScheduler self, object state)
{
    this._pendingTickCount = 1;
    SingleAssignmentDisposable disposable = new SingleAssignmentDisposable();
    this._periodic = disposable;
    disposable.Disposable = self.SchedulePeriodic<long>(1L, this._period, new Func<long, long>(this.Tock));
    try
    {
        base._observer.OnNext(0L);
    }
    catch (Exception exception)
    {
        disposable.Dispose();
        exception.Throw();
    }
    if (Interlocked.Decrement(ref this._pendingTickCount) > 0)
    {
        SingleAssignmentDisposable disposable2 = new SingleAssignmentDisposable {
            Disposable = self.Schedule<long>(1L, new Action<long, Action<long>>(this.CatchUp))
        };
        return new CompositeDisposable(2) { disposable, disposable2 };
    }
    return disposable;
}

Now, the base._observer.OnNext, that's the internal sink set up to trigger on the timer tick, where the Invoke on that is:

private void Invoke()
{
    base._observer.OnNext(0L);
    base._observer.OnCompleted();
    base.Dispose();
}

So yes. It auto-disposes itself - and there won't be any "lingering subscriptions" floating around.

Mmm....crow is tasty. :|

Upvotes: -3

Ana Betts
Ana Betts

Reputation: 74654

If you were using ReactiveUI, a class called ReactiveCollection would definitely help here, you could use it like this:

theCollection.ItemsAdded
    .SelectMany(x => Observable.Timer(TimeSpan.FromSeconds(60), _scheduler).Select(_ => x))
    .Subscribe(x => theCollection.Remove(x));

Upvotes: 0

Related Questions