Reputation: 143
Primer; throughout my code base I often need to work with pooled blocks of memory. This is done for performance reasons to reduce garbage collections (making a real time video game engine component). I handle this by exposing the types as IDisposableValue where you can access T Value only until the wrapper is disposed. You dispose the wrapper to return the value to the pool to be reused.
I build up streams of data processing that work with these wrapped values in response to events occurring over time. This would normally be a perfect candidate for Observables/Reactive Extensions, except that having to dispose the wrapper is inherently a form of mutability, something you don't want when being reactive. If one subscriber disposes the wrapper when they finish with it but a second observer is still working with it the wrapper will throw an exception.
Intended Goal: Have each subscriber receive a separate wrapper over the original real wrapped value. The underlying value will then only be disposed once each subscriber disposes their individual wrapper (think RefCountDisposable). Thus each subscriber can work with the value for as long as they need and they signify they are done by disposing. When all of them complete the value is released back to the pool.
Only problem is I have no idea how to implement this properly in RX. Is this the appropriate way to handle my situation and if so any pointers on how to actually implement it?
Edit 1 - Dirty Solution using ISubject:
I tried to make it work using various combinations of Observable.Select/Create/Defer but could not get Intended Goal above to work doing so. Instead I had to turn to uses Subjects, which I know is shunned upon. Here's my current code.
public class SharedDisposableValueSubject<T> : AbstractDisposable, ISubject<IDisposableValue<T>>
{
private readonly Subject<SharedDisposable> subject;
private readonly SubscriptionCounter<SharedDisposable> counter;
private readonly IObservable<IDisposableValue<T>> observable;
public SharedDisposableValueSubject()
{
this.subject = new Subject<SharedDisposable>();
this.counter = new SubscriptionCounter<SharedDisposable>(this.subject);
this.observable = this.counter.Source.Select(value => value.GetValue());
}
/// <inheritdoc />
public void OnCompleted() => this.subject.OnCompleted();
/// <inheritdoc />
public void OnError(Exception error) => this.subject.OnError(error);
/// <inheritdoc />
public void OnNext(IDisposableValue<T> value) =>
this.subject.OnNext(new SharedDisposable(value, this.counter.Count));
/// <inheritdoc />
public IDisposable Subscribe(IObserver<IDisposableValue<T>> observer) => this.observable.Subscribe(observer);
/// <inheritdoc />
protected override void ManagedDisposal() => this.subject.Dispose();
private class SharedDisposable
{
private readonly IDisposableValue<T> value;
private readonly AtomicInt count;
public SharedDisposable(IDisposableValue<T> value, int count)
{
Contracts.Requires.That(count >= 0);
this.value = value;
this.count = new AtomicInt(count);
if (count == 0)
{
this.value?.Dispose();
}
}
public IDisposableValue<T> GetValue() => new ValuePin(this);
private class ValuePin : AbstractDisposable, IDisposableValue<T>
{
private readonly SharedDisposable parent;
public ValuePin(SharedDisposable parent)
{
Contracts.Requires.That(parent != null);
this.parent = parent;
}
/// <inheritdoc />
public T Value => this.parent.value != null ? this.parent.value.Value : default(T);
/// <inheritdoc />
protected override void ManagedDisposal()
{
if (this.parent.count.Decrement() == 0)
{
this.parent.value?.Dispose();
}
}
}
}
}
public class SubscriptionCounter<T>
{
private readonly AtomicInt count = new AtomicInt(0);
public SubscriptionCounter(IObservable<T> source)
{
Contracts.Requires.That(source != null);
this.Source = Observable.Create<T>(observer =>
{
this.count.Increment();
return new Subscription(source.Subscribe(observer), this.count);
});
}
public int Count => this.count.Read();
public IObservable<T> Source { get; }
private class Subscription : AbstractDisposable
{
private readonly IDisposable subscription;
private readonly AtomicInt count;
public Subscription(IDisposable subscription, AtomicInt count)
{
Contracts.Requires.That(subscription != null);
Contracts.Requires.That(count != null);
this.subscription = subscription;
this.count = count;
}
/// <inheritdoc />
protected override void ManagedDisposal()
{
this.subscription.Dispose();
this.count.Decrement();
}
}
}
public interface IDisposableValue<out T> : IDisposable
{
bool IsDisposed { get; }
T Value { get; }
}
AbstractDisposable is just a base class implementation of the disposable pattern for types that don't hold onto unmanaged types. It ensures ManagedDisposal() is only ever called a single time the first time Dispose() is called. AtomicInt is a wrapper over Interlocked on an int to provide threadsafe atomic updates to an int.
My test code showing how SharedDisposableValueSubject is expected to be used;
public static class SharedDisposableValueSubjectTests
{
[Fact]
public static void NoSubcribersValueAutoDisposes()
{
using (var subject = new SharedDisposableValueSubject<int>())
{
var sourceValue = new DisposableWrapper<int>(0);
sourceValue.IsDisposed.Should().BeFalse();
subject.OnNext(sourceValue);
sourceValue.IsDisposed.Should().BeTrue();
subject.OnCompleted();
}
}
[Fact]
public static void SingleSurcriber()
{
using (var subject = new SharedDisposableValueSubject<int>())
{
var testNumber = 1;
var sourceValue = new DisposableWrapper<int>(testNumber);
sourceValue.IsDisposed.Should().BeFalse();
IDisposableValue<int> retrieved = null;
subject.Subscribe(value => retrieved = value);
// value retrieved from sequence but not disposed yet
subject.OnNext(sourceValue);
retrieved.Should().NotBeNull();
retrieved.Value.Should().Be(testNumber);
retrieved.IsDisposed.Should().BeFalse();
sourceValue.IsDisposed.Should().BeFalse();
// disposing retrieved disposes the source value
retrieved.Dispose();
retrieved.IsDisposed.Should().BeTrue();
sourceValue.IsDisposed.Should().BeTrue();
subject.OnCompleted();
}
}
[Fact]
public static void ManySubcribers()
{
using (var subject = new SharedDisposableValueSubject<int>())
{
var testNumber = 1;
var sourceValue = new DisposableWrapper<int>(testNumber);
sourceValue.IsDisposed.Should().BeFalse();
IDisposableValue<int> retrieved1 = null;
subject.Subscribe(value => retrieved1 = value);
IDisposableValue<int> retrieved2 = null;
subject.Subscribe(value => retrieved2 = value);
// value retrieved from sequence but not disposed yet
subject.OnNext(sourceValue);
retrieved1.Should().NotBeNull();
retrieved1.Value.Should().Be(testNumber);
retrieved1.IsDisposed.Should().BeFalse();
retrieved2.Should().NotBeNull();
retrieved2.Value.Should().Be(testNumber);
retrieved2.IsDisposed.Should().BeFalse();
sourceValue.IsDisposed.Should().BeFalse();
// disposing only 1 retrieved value does not yet dispose the source value
retrieved1.Dispose();
retrieved1.IsDisposed.Should().BeTrue();
retrieved2.IsDisposed.Should().BeFalse();
retrieved2.Value.Should().Be(testNumber);
sourceValue.IsDisposed.Should().BeFalse();
// disposing both retrieved values disposes the source value
retrieved2.Dispose();
retrieved2.IsDisposed.Should().BeTrue();
sourceValue.IsDisposed.Should().BeTrue();
subject.OnCompleted();
}
}
[Fact]
public static void DisposingManyTimesStillRequiresEachSubscriberToDispose()
{
using (var subject = new SharedDisposableValueSubject<int>())
{
var testNumber = 1;
var sourceValue = new DisposableWrapper<int>(testNumber);
sourceValue.IsDisposed.Should().BeFalse();
IDisposableValue<int> retrieved1 = null;
subject.Subscribe(value => retrieved1 = value);
IDisposableValue<int> retrieved2 = null;
subject.Subscribe(value => retrieved2 = value);
subject.OnNext(sourceValue);
// disposing only 1 retrieved value does not yet dispose the source value
// even though the retrieved value is disposed many times
retrieved1.Dispose();
retrieved1.Dispose();
retrieved1.Dispose();
retrieved1.IsDisposed.Should().BeTrue();
retrieved2.IsDisposed.Should().BeFalse();
sourceValue.IsDisposed.Should().BeFalse();
// disposing both retrieved values disposes the source value
retrieved2.Dispose();
retrieved2.IsDisposed.Should().BeTrue();
sourceValue.IsDisposed.Should().BeTrue();
subject.OnCompleted();
}
}
[Fact]
public static void SingleSubcriberUnsubcribes()
{
using (var subject = new SharedDisposableValueSubject<int>())
{
var testNumber = 1;
var sourceValue = new DisposableWrapper<int>(testNumber);
sourceValue.IsDisposed.Should().BeFalse();
var subscription = subject.Subscribe(value => { });
subscription.Dispose();
// source value auto disposes because no subscribers
subject.OnNext(sourceValue);
sourceValue.IsDisposed.Should().BeTrue();
subject.OnCompleted();
}
}
[Fact]
public static void SubcriberUnsubcribes()
{
using (var subject = new SharedDisposableValueSubject<int>())
{
var testNumber = 1;
var sourceValue = new DisposableWrapper<int>(testNumber);
sourceValue.IsDisposed.Should().BeFalse();
IDisposableValue<int> retrieved = null;
subject.Subscribe(value => retrieved = value);
var subscription = subject.Subscribe(value => { });
subscription.Dispose();
// value retrieved from sequence but not disposed yet
subject.OnNext(sourceValue);
retrieved.Should().NotBeNull();
retrieved.Value.Should().Be(testNumber);
retrieved.IsDisposed.Should().BeFalse();
sourceValue.IsDisposed.Should().BeFalse();
// disposing retrieved causes source to be disposed
retrieved.Dispose();
retrieved.IsDisposed.Should().BeTrue();
sourceValue.IsDisposed.Should().BeTrue();
subject.OnCompleted();
}
}
[Fact]
public static async Task DelayedSubcriberAsync()
{
using (var subject = new SharedDisposableValueSubject<int>())
{
var testNumber = 1;
var sourceValue = new DisposableWrapper<int>(testNumber);
sourceValue.IsDisposed.Should().BeFalse();
// delay countdown event used just to ensure that the value isn't disposed until assertions checked
var delay = new AsyncCountdownEvent(1);
var disposed = new AsyncCountdownEvent(2);
subject.Delay(TimeSpan.FromSeconds(1)).Subscribe(async value =>
{
await delay.WaitAsync().DontMarshallContext();
value.Dispose();
disposed.Signal(1);
});
subject.Subscribe(value =>
{
value.Dispose();
disposed.Signal(1);
});
// value is not yet disposed
subject.OnNext(sourceValue);
sourceValue.IsDisposed.Should().BeFalse();
// wait for value to be disposed
delay.Signal(1);
await disposed.WaitAsync().DontMarshallContext();
sourceValue.IsDisposed.Should().BeTrue();
subject.OnCompleted();
}
}
[Fact]
public static void MultipleObservedValues()
{
using (var subject = new SharedDisposableValueSubject<int>())
{
var testNumber1 = 1;
var sourceValue1 = new DisposableWrapper<int>(testNumber1);
sourceValue1.IsDisposed.Should().BeFalse();
var testNumber2 = 2;
var sourceValue2 = new DisposableWrapper<int>(testNumber2);
sourceValue2.IsDisposed.Should().BeFalse();
IDisposableValue<int> retrieved = null;
subject.Subscribe(value => retrieved = value);
// first test value
// value retrieved from sequence but not disposed yet
subject.OnNext(sourceValue1);
retrieved.Should().NotBeNull();
retrieved.Value.Should().Be(testNumber1);
retrieved.IsDisposed.Should().BeFalse();
sourceValue1.IsDisposed.Should().BeFalse();
// disposing retrieved disposes the source value
retrieved.Dispose();
retrieved.IsDisposed.Should().BeTrue();
sourceValue1.IsDisposed.Should().BeTrue();
// second test value
// value retrieved from sequence but not disposed yet
subject.OnNext(sourceValue2);
retrieved.Should().NotBeNull();
retrieved.Value.Should().Be(testNumber2);
retrieved.IsDisposed.Should().BeFalse();
sourceValue2.IsDisposed.Should().BeFalse();
// disposing retrieved disposes the source value
retrieved.Dispose();
retrieved.IsDisposed.Should().BeTrue();
sourceValue2.IsDisposed.Should().BeTrue();
subject.OnCompleted();
}
}
}
All of these pass, but I realize there's a lot of things you can do with an observable so there might be use cases I hadn't considered that could break this implementation. If you know of any problems please let me know. It might also just be the case that I'm trying to make Rx do something it's inherently not meant to do.
Edit 2 - Solution using Publish:
I use Publish in order to wrap the disposable values from the original observable in SharedDisposable, guaranteeing that each original value gets wrapped only once. Then the published observable is subscriber counted and each subscriber gets a separate ValuePin that when disposed decrements the count on the SharedDisposable. When the SharedDisposable count reaches 0 it disposes the original value.
I tried not doing the subscription counting and instead having it so each time a ValuePin is given out it increments the count, but I couldn't find a way to guarantee it'd create the ValuePins for each subscriber before allowing the subscribers to dispose them. This resulted in the subscriber 1 getting their pin, count goes from 0 to 1, then disposing that pin before subscriber 2 gets their pin, count goes from 1 to 0 triggering the original value to be disposed, and then subscriber 2 would should receive a pin but it's too late now.
public static IObservable<IDisposableValue<T>> ShareDisposable<T>(this IObservable<IDisposableValue<T>> source)
{
Contracts.Requires.That(source != null);
var published = source.Select(value => new SharedDisposable<T>(value)).Publish();
var counter = new SubscriptionCounter<SharedDisposable<T>>(published);
published.Connect();
return counter.CountedSource.Select(value => value.GetValue(counter.Count));
}
private class SharedDisposable<T>
{
private const int Uninitialized = -1;
private readonly IDisposableValue<T> value;
private readonly AtomicInt count;
public SharedDisposable(IDisposableValue<T> value)
{
this.value = value;
this.count = new AtomicInt(Uninitialized);
}
public IDisposableValue<T> GetValue(int subscriberCount)
{
Contracts.Requires.That(subscriberCount >= 0);
this.count.CompareExchange(subscriberCount, Uninitialized);
return new ValuePin(this);
}
private class ValuePin : AbstractDisposable, IDisposableValue<T>
{
private readonly SharedDisposable<T> parent;
public ValuePin(SharedDisposable<T> parent)
{
Contracts.Requires.That(parent != null);
this.parent = parent;
}
/// <inheritdoc />
public T Value => this.parent.value != null ? this.parent.value.Value : default(T);
/// <inheritdoc />
protected override void ManagedDisposal()
{
if (this.parent.count.Decrement() == 0)
{
this.parent.value?.Dispose();
}
}
}
}
This certainly seems better since I'm not having to use Subjects in any way, although the subscriber counting feels dirty. Especially because of the way I need to have the count be uninitialized until the first ValuePin is given out. And to be clear, I'm trying to handle the disposal of values produced by an observable that will be shared by 0 to many subscribers, not the disposal of connections to an observable itself which is why I'm not using RefCount instead of Connect.
Upvotes: 1
Views: 364
Reputation: 10783
I think you could refcount the disposable. This would require the publisher to initiate the reference counting and then each subscriber to increment and decrement the counter. You could use RefCountDisposable to do this. I would only consider doing this for private/internal code else you could have a leaky consumer break your system. Alternative solution to Rx could be to look at the Disruptor pattern.
Upvotes: 2