Doanair
Doanair

Reputation: 538

Reactive Extensions IObservable notify when subscribers subscribe

I have a Subject<T> and I want to be notified when something subscribes to it. I cannot find a mechanism for this. Am I missing something?

For example:

    public class AccountManager
    {
        private ReplaySubject<string> _accountEvents = new ReplaySubject<string>();

        public AccountManager()
        {
        }

        public void Add(string val)
        {
            _accountEvents.OnNext(val);
        }

        public IObservable<string> AccountEvents { get { return _accountEvents.AsObservable<string>(); } }

    }

Can AccountManager be notified when any other code calls Subscribe on the AccountEvents property.

Upvotes: 2

Views: 502

Answers (2)

Sergey Aldoukhov
Sergey Aldoukhov

Reputation: 22744

Either this

        var subj = new Subject<string>();
        var o = Observable.CreateWithDisposable<string>(observer =>
                                            {
                                                Console.WriteLine("subscribed");
                                                return subj.Subscribe(observer);
                                            });

or (similar to Scott's answer)

        var o = Observable.Defer(() =>
                                     {
                                         Console.WriteLine("subscribed");
                                         return subj;
                                     });

In the first case you have more freedom, since you have an observer passed in as a parameter, so in addition to routing subject output to the observer you can call observer.OnNext, OnError, OnComplete when your logic demands it.

Upvotes: 5

Scott Weinstein
Scott Weinstein

Reputation: 19117

This is ugly all, but could possibly be made to work

var subj = new Subject<string>();
var io = Observable.Defer(() => Observable.Return("Subscribed")
                                          .Do(Console.WriteLine))
                   .Concat(subj);
 io.Subscribe();

Upvotes: 0

Related Questions