Liero
Liero

Reputation: 27338

Observable of IDisposable object: How to dispose previous value onNext and onComplete?

I have an Observable of IDisposable value

IObservable<IDisposable> disposableValues = source.Select(val => MyDisposableObject());

How to write a pipe that dispose old value, when:

  1. new value is emitted
  2. when the source completes?

I think #1 can be achieved with Buffer, however how do I access last value on onComplete callbak?

disposableValues
    .Buffer(2)
    .Do(buffer => buffer[0].Dispose())
    .Finally(_ => { ??})
    .Subscribe();

Maybe I'm doing it wrong and I could use CancellationToken or something...

Upvotes: 3

Views: 1009

Answers (3)

Enigmativity
Enigmativity

Reputation: 117029

Like I said in the comment, I think it is a bit odd to have a IObservable<IDisposable>. It's hard to reason about the lifetime of the disposable in an observable pipeline like this.

What is more likely is that you have an observable that needs to use a disposable object that you want to do something with and you want to ensure it is disposed afterwards.

Let's assume you have this disposable:

public class MyDisposableObject : IDisposable
{
    public void DoSomething()
    {
        Console.WriteLine("DoSomething!");
    }
    
    private bool disposed = false;

    protected virtual void Dispose(bool disposing)
    {
        if (!disposed)
        {
            if (disposing)
            {
                Console.WriteLine("Dispose!");
            }
            disposed = true;
        }
    }

    public void Dispose()
    {
        Dispose(true);
    }
}

It has something to do and it let's us know when it disposes.

Now for the query.

var source = new Subject<Unit>();

IObservable<Unit> observable =
    source
        .SelectMany(u =>
            Observable.Using(
                () => new MyDisposableObject(),
                mdo => Observable.Start(() => mdo.DoSomething())));
        
observable.Subscribe();

It uses the Observable.Using operator to create the disposable and then is able to start an observable that uses the disposable. As soon as it completes or if the source observable completes everything gets disposed.

Try this test code:

source.OnNext(Unit.Default);
source.OnNext(Unit.Default);
source.OnNext(Unit.Default);
source.OnCompleted();

That gives me:

DoSomething!
Dispose!
DoSomething!
DoSomething!
Dispose!
Dispose!

Everything is nicely disposed and a clean observable is created with no side-effects.

Upvotes: 2

Theodor Zoulias
Theodor Zoulias

Reputation: 43409

What you probably want is something similar with the Do operator, but for the previous element. Below is an implementation of a custom DoOnPrevious operator:

/// <summary>Invokes an action for the previous element in the observable
/// sequence. The action for the last element is invoked when the observable
/// sequence completes.</summary>
private static IObservable<T> DoOnPrevious<T>(this IObservable<T> source,
    Action<T> onPrevious)
{
    return source
        .Select(x => (Item: x, HasValue: true))
        .Append((default, false))
        .Scan((previous, current) =>
        {
            if (previous.HasValue) onPrevious(previous.Item);
            return current;
        })
        .Where(entry => entry.HasValue)
        .Select(entry => entry.Item);
}

Usage example:

disposableValues
    .DoOnPrevious(x => x.Dispose())
    .Subscribe();

Upvotes: 0

Progman
Progman

Reputation: 19546

Using Buffer() is the right way to get the "previous" object, but you should use Buffer(2,1) to get a "sliding window". Then you start your subscription with a default value (like null) AND let it end with a default value (also null). The goal is to get a sequence like this:

[null, obj1]
[obj1, obj2]
[obj2, obj3]
[obj3, null]

You will have four events and for three of them you can call Dispose(). For the first null you can use StartWith(null) which precedes your actual data. For the last null you use Concat() to add a observable with only one value by using Observable.Never() and StartWith(). When your original observable completes you will get a "fake" event where you can dispose your last used object. Check the following example:

public class Program
{
    public class TestObject : IDisposable
    {
        public TestObject(int value) {
            this.Value = value;
        }

        public int Value { get; }

        public void Dispose()
        {
            Console.WriteLine($"Dispose called for {this.Value}");
        }

        public override string ToString()
        {
            return $"TestObject({this.Value})";
        }
    }

    public static void Main()
    {   
        using (MyContext context = new MyContext())
        {
            ISubject<TestObject> source = new Subject<TestObject>();
            IDisposable subscription = source
                .StartWith((TestObject)null)
                .Concat(Observable.Never<TestObject>().StartWith((TestObject)null))
                .Buffer(2,1)
                .Subscribe(it => {
                    Console.WriteLine("Array content: "+String.Join(", ", it));
                    TestObject first = it.First();
                    TestObject last = it.Last();
                    if (first != null) {
                        Console.WriteLine($"Found an old object {first}, dispose it");
                        first.Dispose();
                    } else {
                        Console.WriteLine("There is no previous object");
                    }

                    if (last != null) {
                        Console.WriteLine($"'Current' object is: {last}");
                    } else {
                        Console.WriteLine("No 'current' object");
                    }
                });
            source.OnNext(new TestObject(1));
            source.OnNext(new TestObject(2));
            source.OnNext(new TestObject(3));
            source.OnCompleted();
            Console.WriteLine("Finished");
        }
    }
}

This will generate the following debug output:

Array content: , TestObject(1)
There is no previous object
'Current' object is: TestObject(1)
Array content: TestObject(1), TestObject(2)
Found an old object TestObject(1), dispose it
Dispose called for 1
'Current' object is: TestObject(2)
Array content: TestObject(2), TestObject(3)
Found an old object TestObject(2), dispose it
Dispose called for 2
'Current' object is: TestObject(3)
Array content: TestObject(3), 
Found an old object TestObject(3), dispose it
Dispose called for 3
No 'current' object
Finished

Upvotes: 0

Related Questions