Flack
Flack

Reputation: 5889

Reactive extensions Subject uses

I am at the early stages of learning about Rx and have come across the Subject class. I don't quite understand why this class exists. I understand that it implements both IObservable and IObserver but what are Subjects used for?

As far as I can tell, they can act as a proxy between a source and a bunch of subscribers but couldn't the subscribers just subscribe directly to the source? When I see instances of a Subject being used as an observable and observer I get confused.

I am sure I am just not getting some basic fact here but I don't know what Subject brings to the game. I guess I am looking for some basic (but hopefully real world) example of when Subjects are useful and when they are not (as I have also read that Subjects are not usually used, replaced with Observable.Create).

Upvotes: 0

Views: 796

Answers (2)

Rajnikant
Rajnikant

Reputation: 2236

Using subjects means we are now managing state, which is potentially mutating. Mutating state and asynchronous programming are very hard to get right. Furthermore many of the operators (extension methods) have been carefully written to ensure correct and consistent lifetime of subscriptions and sequences are maintained. When you introduce subjects you can break this.

A significant benefit that the Create method has over subjects is that the sequence will be lazily evaluated.

In this example we show how we might first return a sequence via standard blocking eagerly evaluated call, and then we show the correct way to return an observable sequence without blocking by lazy evaluation.

Below example will be blocked for at least 1 second before they even receive the IObservable, regardless of if they do actually subscribe to it or not.

private IObservable<string> BlockingMethod()
{
var subject = new ReplaySubject<string>();
subject.OnNext("a");
subject.OnNext("b");
subject.OnCompleted();
Thread.Sleep(1000);
return subject;
}

Where as in bleow example consumer immediately receives the IObservable and will only incur the cost of the thread sleep if they subscribe.

private IObservable<string> NonBlocking()
{

return Observable.Create<string>(

(IObserver<string> observer) =>
{

observer.OnNext("a");
observer.OnNext("b");
observer.OnCompleted();
Thread.Sleep(1000);
return Disposable.Create(() => Console.WriteLine("Observer has unsubscribed"));
//or can return an Action like 
//return () => Console.WriteLine("Observer has unsubscribed"); 
});
}

Upvotes: 0

JerKimball
JerKimball

Reputation: 16894

First, a lot of folks will tell you Subject<T> doesn't belong, since it goes against some other tenets/patterns in the Rx framework.

That said, they act as either an IObservable or an IObserver, so you get some useful functionality out of them - I generally use them during the initial development stages for:

  • A "debug point" of sorts, where I can subscribe to an IObservable chain inline with a Subject<T>, and inspect the contents with the debugger.

  • An "observable on demand", where I can manually call OnNext and pass in data I want to inject into the stream

  • Used to use them to replicate what ConnectableObserable now does - a "broadcast" mechanism for multiple subscribers to a single Observable, but that can be done with Publish now.

  • Bridging layer between disparate systems; again, this is largely unnecessary now with the various FromAsync, FromEvent extensions, but they can still be used as such (basically, the "old" system injects events into the Subject<T> via OnNext, and from then on the normal Rx flow.

Upvotes: 1

Related Questions