mrt181
mrt181

Reputation: 5316

Why does SubscribeOn does not work here as a extension method but as a direct call?

I have class ObserverBase. In the Start method I subscribe to an observable, apply a transformation and a filter.

I do not understand why the usage of SubscribeOn as an extension method does not compile. Can anyone explain this behavior?

public class ObserverBase<T1, T2> : IPeer<T1, T2>
{

    private readonly ISubject<T2> subject;

    private readonly IMapper<T1, T2> messageMapper;

    protected ObserverBase(ISubject<T2> subject, IMapper<T1, T2> messageMapper)
    {
        this.subject = subject;
        this.messageMapper = messageMapper;
    }

    public IObservable<T2> Start(IObservable<T1> observable, Func<T2, bool> predicate)
    {
        //works
        Synchronization.ObserveOn(Synchronization.SubscribeOn(observable, TaskPoolScheduler.Default),
                                  Scheduler.Immediate)
                  .Select(message => this.messageMapper.Map(message))
                  .Where(predicate)
                  .Subscribe(observation => this.subject.OnNext(observation));


        // compile error on SubscribeOn: The type arguments cannot be inferred from the usage. Try specifying the type argument explicitly.
        observable.SubscribeOn(TaskPoolScheduler.Default).ObserveOn(Scheduler.Immediate)
                  .Select(message => this.messageMapper.Map(message))
                  .Where(predicate)
                  .Subscribe(observation => this.subject.OnNext(observation));

        // compile error on TaskPoolScheduler.Default: Cannot resolve method SubscribeOn(System.Reactive.Concurrency.TaskPoolScheduler)
        observable.SubscribeOn<T1>(TaskPoolScheduler.Default).ObserveOn(Scheduler.Immediate)
                  .Select(message => this.messageMapper.Map(message))
                  .Where(predicate)
                  .Subscribe(observation => this.subject.OnNext(observation));

        return this.subject;
    }
}

Upvotes: 0

Views: 242

Answers (3)

mrt181
mrt181

Reputation: 5316

Visual Studio compiles the code but ReSharper reported these errors. After restarting Visual Studio ReSharper stopped complaining.

Upvotes: 0

James World
James World

Reputation: 29786

This works for me - did you include a using statement for the System.Reactive.Linq namespace (where SubscribeOn is defined)? Synchronization.ObserveOn is in a different namespace (System.Reactive.Concurrency) - which I suspect you have a using statement for already.

Upvotes: 3

Lorenzo Dematt&#233;
Lorenzo Dematt&#233;

Reputation: 7839

Extension methods are quite low in the chain of overload resolution: they are not "close" enough: http://ericlippert.com/2013/12/23/closer-is-better/

You might found this answer useful too: https://stackoverflow.com/a/25564127/863564 It seems to be directly related to your problem.

Upvotes: 1

Related Questions