Bryan Anderson
Bryan Anderson

Reputation: 16129

ObserveOn(Scheduler.CurrentThread) doesn't cause subscribed Action to be run on original thread

I have an Action that takes a callback it will call with a generic argument once it completes, i.e. an Action<Action<T>>. I want to show a busy spinner when the action starts and then take it away when it calls the callback so I created a simple utility to do that. The problem I'm having is that the user will expect their callback to run on the original calling thread but it does not always do so. It almost always works perfectly in unit tests (nUnit) but doesn't work for some calls when the application is actually running (WPF, .Net 4).

Here are the relevant bits of what I have

void WrapAsyncCallbackPattern<T>(Action<T> callback, Action<Action<T>> actionToRun)
{
    var subject = new AsyncSubject<T>();
    try
    {
        actionToRun(
            result =>
            {
                subject.OnNext(result);
                subject.OnCompleted();
            });
    }
    catch (Exception ex)
    {
        subject.OnError(ex);
    }

    subject
        .ObserveOn(Scheduler.CurrentThread)
        .Subscribe(callback, OnError);
}

I want callback to be run on the thread I'm subscribing (and subject is declared on) but it doesn't seem to do so reliably. I assume that I'm doing something silly. What is it?

Edit: Added unit test code

private readonly TimeSpan m_WaitTime = TimeSpan.FromSeconds(1);

[Test]
public void WrapAsyncCallbackPattern_WithActionOnDifferentThread_CallsCallbackOnSameThread()
{
    var awaiter = new AutoResetEvent(false);

    bool callbackRan = false;
    int callingThreadId = Thread.CurrentThread.ManagedThreadId;
    int callbackThreadId = int.MinValue;
    int actionThreadId = int.MinValue;

    BackgroundOperation.WrapAsyncCallbackPattern<int>(
        _ =>
        {
            callbackRan = true;
            callbackThreadId = Thread.CurrentThread.ManagedThreadId;

            awaiter.Set();
        },
        cb => ThreadPool.QueueUserWorkItem(
            _ =>
            {
                actionThreadId = Thread.CurrentThread.ManagedThreadId;
                cb(0);
            }));

    var errorInfo = string.Format("\r\nCalling thread = {0}; Action thread = {1}; Callback thread = {2}", callingThreadId, actionThreadId, callbackThreadId);

    Assert.IsTrue(awaiter.WaitOne(m_WaitTime));
    Assert.IsTrue(callbackRan);
    Assert.AreNotEqual(callingThreadId, actionThreadId, "Action needs to be run on a different thread for this test." + errorInfo);
    Assert.AreNotEqual(actionThreadId, callbackThreadId, "Callback should not be run on action thread." + errorInfo);
    Assert.AreEqual(callingThreadId, callbackThreadId, "Callback should be run on calling thread." + errorInfo);
}

Upvotes: 1

Views: 1194

Answers (1)

Enigmativity
Enigmativity

Reputation: 117027

You probably have the wrong understanding of what Scheduler.CurrentThread does. I think everyone makes this mistake.

The CurrentThread scheduler relates to the executing observable and not when it is defined (or subscribed to). Think deferred or lazy execution. This should make sense as whenever you jump to a different thread you need some way of marshalling the call.

So what you're really after is something like this:

var synchContext = new SynchronizationContextScheduler(
    System.Threading.SynchronizationContext.Current) 

subject
    .ObserveOn(synchContext)
    .Subscribe(callback, OnError);

Or maybe:

subject
    .ObserveOn(this) /* this is my current form */
    .Subscribe(callback, OnError);

If you do that you should be able to control which thread your callbacks get run on.

Your tests probably worked because they ended up executing synchronously.

Upvotes: 8

Related Questions