Reputation: 27338
I have an Observable
of IDisposable
value
IObservable<IDisposable> disposableValues = source.Select(val => MyDisposableObject());
How to write a pipe that dispose old value, when:
I think #1 can be achieved with Buffer
, however how do I access last value on onComplete
callbak?
disposableValues
.Buffer(2)
.Do(buffer => buffer[0].Dispose())
.Finally(_ => { ??})
.Subscribe();
Maybe I'm doing it wrong and I could use CancellationToken
or something...
Upvotes: 3
Views: 1009
Reputation: 117029
Like I said in the comment, I think it is a bit odd to have a IObservable<IDisposable>
. It's hard to reason about the lifetime of the disposable in an observable pipeline like this.
What is more likely is that you have an observable that needs to use a disposable object that you want to do something with and you want to ensure it is disposed afterwards.
Let's assume you have this disposable:
public class MyDisposableObject : IDisposable
{
public void DoSomething()
{
Console.WriteLine("DoSomething!");
}
private bool disposed = false;
protected virtual void Dispose(bool disposing)
{
if (!disposed)
{
if (disposing)
{
Console.WriteLine("Dispose!");
}
disposed = true;
}
}
public void Dispose()
{
Dispose(true);
}
}
It has something to do and it let's us know when it disposes.
Now for the query.
var source = new Subject<Unit>();
IObservable<Unit> observable =
source
.SelectMany(u =>
Observable.Using(
() => new MyDisposableObject(),
mdo => Observable.Start(() => mdo.DoSomething())));
observable.Subscribe();
It uses the Observable.Using
operator to create the disposable and then is able to start an observable that uses the disposable. As soon as it completes or if the source
observable completes everything gets disposed.
Try this test code:
source.OnNext(Unit.Default);
source.OnNext(Unit.Default);
source.OnNext(Unit.Default);
source.OnCompleted();
That gives me:
DoSomething!
Dispose!
DoSomething!
DoSomething!
Dispose!
Dispose!
Everything is nicely disposed and a clean observable is created with no side-effects.
Upvotes: 2
Reputation: 43409
What you probably want is something similar with the Do
operator, but for the previous element. Below is an implementation of a custom DoOnPrevious
operator:
/// <summary>Invokes an action for the previous element in the observable
/// sequence. The action for the last element is invoked when the observable
/// sequence completes.</summary>
private static IObservable<T> DoOnPrevious<T>(this IObservable<T> source,
Action<T> onPrevious)
{
return source
.Select(x => (Item: x, HasValue: true))
.Append((default, false))
.Scan((previous, current) =>
{
if (previous.HasValue) onPrevious(previous.Item);
return current;
})
.Where(entry => entry.HasValue)
.Select(entry => entry.Item);
}
Usage example:
disposableValues
.DoOnPrevious(x => x.Dispose())
.Subscribe();
Upvotes: 0
Reputation: 19546
Using Buffer()
is the right way to get the "previous" object, but you should use Buffer(2,1)
to get a "sliding window". Then you start your subscription with a default value (like null
) AND let it end with a default value (also null
). The goal is to get a sequence like this:
[null, obj1]
[obj1, obj2]
[obj2, obj3]
[obj3, null]
You will have four events and for three of them you can call Dispose()
.
For the first null
you can use StartWith(null)
which precedes your actual data. For the last null
you use Concat()
to add a observable with only one value by using Observable.Never()
and StartWith()
. When your original observable completes you will get a "fake" event where you can dispose your last used object. Check the following example:
public class Program
{
public class TestObject : IDisposable
{
public TestObject(int value) {
this.Value = value;
}
public int Value { get; }
public void Dispose()
{
Console.WriteLine($"Dispose called for {this.Value}");
}
public override string ToString()
{
return $"TestObject({this.Value})";
}
}
public static void Main()
{
using (MyContext context = new MyContext())
{
ISubject<TestObject> source = new Subject<TestObject>();
IDisposable subscription = source
.StartWith((TestObject)null)
.Concat(Observable.Never<TestObject>().StartWith((TestObject)null))
.Buffer(2,1)
.Subscribe(it => {
Console.WriteLine("Array content: "+String.Join(", ", it));
TestObject first = it.First();
TestObject last = it.Last();
if (first != null) {
Console.WriteLine($"Found an old object {first}, dispose it");
first.Dispose();
} else {
Console.WriteLine("There is no previous object");
}
if (last != null) {
Console.WriteLine($"'Current' object is: {last}");
} else {
Console.WriteLine("No 'current' object");
}
});
source.OnNext(new TestObject(1));
source.OnNext(new TestObject(2));
source.OnNext(new TestObject(3));
source.OnCompleted();
Console.WriteLine("Finished");
}
}
}
This will generate the following debug output:
Array content: , TestObject(1)
There is no previous object
'Current' object is: TestObject(1)
Array content: TestObject(1), TestObject(2)
Found an old object TestObject(1), dispose it
Dispose called for 1
'Current' object is: TestObject(2)
Array content: TestObject(2), TestObject(3)
Found an old object TestObject(2), dispose it
Dispose called for 2
'Current' object is: TestObject(3)
Array content: TestObject(3),
Found an old object TestObject(3), dispose it
Dispose called for 3
No 'current' object
Finished
Upvotes: 0