Daniel Müller
Daniel Müller

Reputation: 453

EventLoopScheduler: unexpected behavior on Dispose ObjectDisposedException

When calling Dispose on an EventLoopScheduler (that has at least one item in its working queue) it will throw an ObjectDisposedException. The exception is thrown from its worker thread.

I've seen and read the two questions that already exist:

However, I think some of the answers are not quite correct, Quoting Intro to Rx regarding EventLoopScheduler:

The EventLoopScheduler implements IDisposable, and calling Dispose will allow the thread to terminate. As with any implementation of IDisposable, it is appropriate that you explicitly manage the lifetime of the resources you create.

Source: http://introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html#EventLoopScheduler

They provide an example about how to use EventLoopScheduler correctly:

Observable
    .Using(()=>new EventLoopScheduler(), els=> GetPrices(els))
    .Subscribe(...)

Unfortunately this example does not work (at least not for me :-). Given this piece of code:

internal class Program
{
    private static void Main(string[] args)
    {
        var source = new Subject<string>();
        var subscription = Observable.Using(
            () => new EventLoopScheduler(),
            scheduler => source
                .ObserveOn(scheduler)
                .Do(LongRunningAction))
            .Subscribe();

        source.OnNext("First action (2 seconds)");
        Thread.Sleep(TimeSpan.FromSeconds(1));
        subscription.Dispose(); // Scheduler is still busy!
        Console.ReadLine(); 
    }

    private static void LongRunningAction(string text) {
        Thread.Sleep(TimeSpan.FromSeconds(2));
        Console.WriteLine(text);
    }
}

I would expect see a text message after 2 seconds without any error (even though the subscription has been disposed after 1 second). The EventLoopScheduler cannot cancel an ongoing operation, that's okay for me.

What you actually get, is the message and an unhandled ObjectDisposedException.

So, is this a bug or am I doing it wrong? :-)

To workaround this exception, I currently wrap the EventLoopScheduler and call scheduler.Schedule(() => scheduler.Dispose()) on wrapper.Dispose().

Upvotes: 1

Views: 1139

Answers (3)

Daniel M&#252;ller
Daniel M&#252;ller

Reputation: 453

Okay, I have something that works. But it is not thread-safe, relevant code line is marked with comment. Guess I should open a bug ticket :-/

private static void Main(string[] args)
{
    var originSource = new Subject<string>();
    var subscription = UsingEventLoop(originSource)
        .Do(LongRunningAction) // runs on EventLoopScheduler thread
        .Subscribe();

    originSource.OnNext("First action (appears after 2 seconds)");
    originSource.OnNext("Second action (must not appear");

    Thread.Sleep(TimeSpan.FromSeconds(1));
    subscription.Dispose(); // Scheduler is still busy with first action!

    Console.WriteLine("Press any key to exit.");
    Console.ReadLine();
}

private static IObservable<TValue> UsingEventLoop<TValue>(IObservable<TValue> source)
{
    return Observable.Using(
        () => new EventLoopScheduler(),
        scheduler => Observable.Create<TValue>((obs, ct) =>
        {
            return Task.FromResult(source.Subscribe(value =>
            {
                // The following check+call is NOT thread safe!
                if (!ct.IsCancellationRequested) 
                {
                    scheduler.Schedule(() => obs.OnNext(value));
                }
            }));
        }));
}

private static void LongRunningAction<TValue>(TValue value) {
    Thread.Sleep(TimeSpan.FromSeconds(2));
    Console.WriteLine(value);
}

Upvotes: 0

Lee Campbell
Lee Campbell

Reputation: 10783

My comments above with James' answer. This "answer" is here to provide the sample code that "fixes" the issue.

I do however believe that there is a bug with the EventLoopScheduler. I don't think* it should continue to schedule work recursively if it has been disposed.

void Main()
{
    //In my example GetPrices is the source. 
    //  I meant that you could use an ELS to do some heavy work to get prices.
    //var source = new Subject<string>();   
    var subscription = Observable.Using(
        () => new EventLoopScheduler(),
        scheduler =>
        {
            return Observable.Create<string>((obs, ct) =>
            {
                var scheduleItem = scheduler.Schedule(0, (state,self) => {
                    //Do work to get price (network request? or Heavy CPU work?)
                    var price = state.ToString("c");
                    LongRunningAction(price);
                    obs.OnNext(price);
                    //Without this check, we see that the Scheduler will try to 
                    //  recursively call itself even when disposed.
                    if(!ct.IsCancellationRequested)
                        self(state+1);
                });
                return Task.FromResult(scheduleItem);
            });
        })
        .Subscribe();

    Thread.Sleep(TimeSpan.FromSeconds(1));
    subscription.Dispose(); // Scheduler is still busy!
    Console.ReadLine();
}

private static void LongRunningAction(string text)
{
    Thread.Sleep(TimeSpan.FromSeconds(2));
    Console.WriteLine(text);
}

*but I completely reserve the right to change my mind when I am convinced otherwise.

FWIW : Generally I only use an ELS as a readonly field in a service that I want to dedicate a thread to processing some inbound work. e.g. I only want to use one thread to read from the network or disk for that service. In this case I create an ELS and it will do any work. It then is disposed when the class containing it is disposed. I don't think I would use it very often at all as the sample from IntroToRx.com shows.

Upvotes: 1

James World
James World

Reputation: 29796

So, having been quoted out of context, I am forced to respond. :) Let's expand the quote to the important bit:

You have no business disposing that EventLoopScheduler! Once you have passed it to other Rx Operators, you have passed on the responsibility for it.

The problem is, that you are trying to have the observer (subscriber) clean up the scheduler. But the observer passed the scheduler to the observable. If you want to dispose the scheduler, you have to consider that it's the that observable that now "owns" it. The observable knows:

  • When it is subscribed to
  • When it is unsubscribed from
  • When it has sent OnComplete/OnError to all it's subscribers

With that information, it's perfectly placed to know when any scheduler it as been given can be disposed. (Even then, if you are trying to go for a generic cleanup, it needs to do the dispose of the scheduler in a finalizer as that's the only point it can guarantee another subscriber isn't go to come along without special knowledge.)

An individual subscriber however, has no guarantee to have any of this information - knowledge of potential other subscribers, and when the last events have been dispatched are not exposed to it. The observable it passed the scheduler to can party on it in all kinds of funky ways: calling crazy methods that sleep a lot; fabricating events out of thin air just because it fancies it; delaying events until next Tuesday; responding to unsubscribe events by pinning a note on the fridge and promising to get to that mañana, honest.

So, you want to clean up that scheduler safely every time? Then you need to have your observable do it.

The built in operators don't bother with any of this though - I suspect it wasn't considered a big concern because it's just not necessary in most use cases. In fact, I don't think I've ever seen a case where one has needed dispose an EventLoopScheduler - they've always been used for the life of the program. It's easy to get hung up on thinking you need to dispose every IDisposable you see - but actually with Rx, it's often just not necessary (especially with subscriptions, where Dispose it really just a request to cancel a subscription - not a command to clean up resources. The Rx team didn't want to create another interface when IDisposable made a perfectly good subscription handle.)

The EventLoopScheduler suspends it's thread when it's not busy - so most of the time you just don't need to worry about clean up, unless you are creating some arbitrary number of them (hint: you really shouldn't need to do this).

If you do, you might want to look if the NewThreadScheduler will do instead, which actually uses an EventLoopScheduler under the covers, in a special secret (i.e. internal) mode that quits the thread if the scheduler queue is empty - but reuses it otherwise. Yes, despite popular misconceptions to the contrary, the NewThreadScheduler does reuse threads and therefore doesn't have a big associated thread creation cost under load from a single subscriber. It's only when multiple subscribers are in play that multiple threads show up, or when it goes idle that the next event will cause thread creation to occur.

But if you are using an EventLoopScheduler, you are probably using it in one spot to tie things to one globally shared event loop (after all - that's what event loops usually do - centralize events onto a single thread in an app) - so cleaning up that thread is rarely necessary, as it will go when the process dies anyway.

Upvotes: 0

Related Questions