Gilad
Gilad

Reputation: 588

Reactive Extensions, Subject<T>

I'm having some hard time understanding the Subject object.

Consider the following code:

        var sub = new Subject<int>();
        sub.Subscribe(x => Console.WriteLine(x));    //subscriber #1        
        sub.Subscribe(x => Console.WriteLine(x));    //subscriber #2        
        sub.OnNext(2);

I'm creating a subject of int, and when i execute OnNext it calls the other subscribers (#1 and #2). The thing that i don't get is that i read that Subject means an object which is both observable and observer but how does this explains why when i call OnNext the other subscribers are called.

I would understand if OnNext of the subject would propagate it to all subscribers = publish to all others (which make sense) but when I checked the source code i couldn't see anything that does it, see below.

Can someone maybe understand from the code below what exactly makes the OnNext(2) propagate to the other subscriptions? (#1, #2)?

public sealed class Subject : ISubject, ISubject, IObserver, IObservable, IDisposable { // Fields private volatile IObserver _observer;

// Methods
public Subject()
{
    this._observer = NopObserver<T>.Instance;
}

public void Dispose()
{
    this._observer = DisposedObserver<T>.Instance;
}

public void OnCompleted()
{
    IObserver<T> comparand = null;
    IObserver<T> completed = DoneObserver<T>.Completed;
    do
    {
        comparand = this._observer;
    }
    while (((comparand != DisposedObserver<T>.Instance) && !(comparand is DoneObserver<T>)) && (Interlocked.CompareExchange<IObserver<T>>(ref this._observer, completed, comparand) != comparand));
    comparand.OnCompleted();
}

public void OnError(Exception error)
{
    if (error == null)
    {
        throw new ArgumentNullException("error");
    }
    IObserver<T> comparand = null;
    DoneObserver<T> observer3 = new DoneObserver<T> {
        Exception = error
    };
    DoneObserver<T> observer2 = observer3;
    do
    {
        comparand = this._observer;
    }
    while (((comparand != DisposedObserver<T>.Instance) && !(comparand is DoneObserver<T>)) && (Interlocked.CompareExchange<IObserver<T>>(ref this._observer, observer2, comparand) != comparand));
    comparand.OnError(error);
}

public void OnNext(T value)
{
    this._observer.OnNext(value);
}

public IDisposable Subscribe(IObserver<T> observer)
{
    if (observer == null)
    {
        throw new ArgumentNullException("observer");
    }
    IObserver<T> comparand = null;
    IObserver<T> observer3 = null;
    do
    {
        comparand = this._observer;
        if (comparand == DisposedObserver<T>.Instance)
        {
            throw new ObjectDisposedException("");
        }
        if (comparand == DoneObserver<T>.Completed)
        {
            observer.OnCompleted();
            return Disposable.Empty;
        }
        DoneObserver<T> observer4 = comparand as DoneObserver<T>;
        if (observer4 != null)
        {
            observer.OnError(observer4.Exception);
            return Disposable.Empty;
        }
        if (comparand == NopObserver<T>.Instance)
        {
            observer3 = observer;
        }
        else
        {
            Observer<T> observer5 = comparand as Observer<T>;
            if (observer5 != null)
            {
                observer3 = observer5.Add(observer);
            }
            else
            {
                observer3 = new Observer<T>(new ImmutableList<IObserver<T>>(new IObserver<T>[] { comparand, observer }));
            }
        }
    }
    while (Interlocked.CompareExchange<IObserver<T>>(ref this._observer, observer3, comparand) != comparand);
    return new Subscription<T>((Subject<T>) this, observer);
}

private void Unsubscribe(IObserver<T> observer)
{
    IObserver<T> comparand = null;
    IObserver<T> instance = null;
Label_0004:
    comparand = this._observer;
    if ((comparand != DisposedObserver<T>.Instance) && !(comparand is DoneObserver<T>))
    {
        Observer<T> observer4 = comparand as Observer<T>;
        if (observer4 != null)
        {
            instance = observer4.Remove(observer);
        }
        else
        {
            if (comparand != observer)
            {
                return;
            }
            instance = NopObserver<T>.Instance;
        }
        if (Interlocked.CompareExchange<IObserver<T>>(ref this._observer, instance, comparand) != comparand)
        {
            goto Label_0004;
        }
    }
}

// Properties
public bool HasObservers
{
    get
    {
        return (((this._observer != NopObserver<T>.Instance) && !(this._observer is DoneObserver<T>)) && (this._observer != DisposedObserver<T>.Instance));
    }
}

// Nested Types
private class Subscription : IDisposable
{
    // Fields
    private IObserver<T> _observer;
    private Subject<T> _subject;

    // Methods
    public Subscription(Subject<T> subject, IObserver<T> observer)
    {
        this._subject = subject;
        this._observer = observer;
    }

    public void Dispose()
    {
        IObserver<T> observer = Interlocked.Exchange<IObserver<T>>(ref this._observer, null);
        if (observer != null)
        {
            this._subject.Unsubscribe(observer);
            this._subject = null;
        }
    }
}

}

Upvotes: 0

Views: 2175

Answers (2)

Gilad
Gilad

Reputation: 588

I know that but what bothers me is that it didn't make sense. I dug further into the code and found out that their internal implementation of observer contains more observers, see below.

And if you check the OnNext method you can see that they are iterating over all the observers and calling their OnNext method.

Now everything make sense to me, i understood the logic but couldn't see where it was implemented.

internal class Observer<T> : IObserver<T>
{
    private readonly ImmutableList<IObserver<T>> _observers;

    public Observer(ImmutableList<IObserver<T>> observers)
    {
        this._observers = observers;
    }

    internal IObserver<T> Add(IObserver<T> observer)
    {
        return new Observer<T>(this._observers.Add(observer));
    }

    public void OnCompleted()
    {
        foreach (IObserver<T> observer in this._observers.Data)
        {
            observer.OnCompleted();
        }
    }

    public void OnError(Exception error)
    {
        foreach (IObserver<T> observer in this._observers.Data)
        {
            observer.OnError(error);
        }
    }

    public void OnNext(T value)
    {
        foreach (IObserver<T> observer in this._observers.Data)
        {
            observer.OnNext(value);
        }
    }

    internal IObserver<T> Remove(IObserver<T> observer)
    {
        int index = Array.IndexOf<IObserver<T>>(this._observers.Data, observer);
        if (index < 0)
        {
            return this;
        }
        if (this._observers.Data.Length == 2)
        {
            return this._observers.Data[1 - index];
        }
        return new Observer<T>(this._observers.Remove(observer));
    }
}

Upvotes: 1

Alex Shtoff
Alex Shtoff

Reputation: 2640

Subject is an observable because you can subscribe to it. You do it in your example (you did subscribe two subscribers).

Subject is also an observer because you can do the following:

someObservable.Subscribe(subject);

That way your subject will receive the events from someObservable and propagate them to its own subscribers.

P.S. in your code you called the OnNext() method yourself. But that is exactly what will someObservable do when you subscribe to it with your subject.

Upvotes: 0

Related Questions