Snak
Snak

Reputation: 693

Count all subscriptions of a subject

I have a Subject where I subscribe methods that should be called when a determined event in a game happens.

public Subject<SomeEvent> TestSubject = new Subject<SomeEvent>();

Some instances subscribe to that Subject.

TestSubject.Subscribe(MyMethod);

My objective is to count how many methods have been subscribed to that Subject. I've seen some examples using Count() extension but I need an int as a return value so then I can use it somewhere else, and Count() returns an IObservable.

if (subjectCount > 0)
{
    DoSomething();
}

Is there any way to get the number of subscriptions on a subject or do I need to keep track of them manually (having a public int SubjectSubcriptions and adding 1 everytime I subscribe a method) ?

Upvotes: 4

Views: 829

Answers (2)

Lee Campbell
Lee Campbell

Reputation: 10783

I am curious if this is only for testing purposes or not. If so, then the Rx-Testing nuget package has the tools to give you this information.

e.g. you can validate the number of subscription in a unit test like this

TestScheduler scheduler = new TestScheduler();
var obs = scheduler.CreateColdObservable(
      ReactiveTest.OnNext(1, "foo"),
      ReactiveTest.OnNext(1000, "bar"),
    );

//Do some work that should add subscriptions.
Assert.AreEqual(expectedSubriptionCount, obs.Subscriptions);

Upvotes: 2

CharlesNRice
CharlesNRice

Reputation: 3259

Easiest way would be to create your own implementation of ISubject with a wrapper around a subject.

public class CountSubject<T> : ISubject<T>, IDisposable
{
    private readonly ISubject<T> _baseSubject;
    private int _counter;
    private IDisposable _disposer = Disposable.Empty;
    private bool _disposed;

    public int Count
    {
        get { return _counter; }
    }

    public CountSubject()
        : this(new Subject<T>())
    {
        // Need to clear up Subject we created
        _disposer = (IDisposable) _baseSubject;
    }

    public CountSubject(ISubject<T> baseSubject)
    {
        _baseSubject = baseSubject;
    }

    public void OnCompleted()
    {
        _baseSubject.OnCompleted();
    }

    public void OnError(Exception error)
    {
        _baseSubject.OnError(error);
    }

    public void OnNext(T value)
    {
        _baseSubject.OnNext(value);
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        Interlocked.Increment(ref _counter);
        return new CompositeDisposable(Disposable.Create(() => Interlocked.Decrement(ref _counter)),
                                       _baseSubject.Subscribe(observer));
    }

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    protected virtual void Dispose(bool disposing)
    {
        if (!_disposed)
        {
            if (disposing)
            {
                _disposer.Dispose();
            }
            _disposed = true;
        }
    }
}

Upvotes: 5

Related Questions