Reputation: 4408
There is a way to wrap an event as observable using Observable.FromEvent
. E.g. this class:
class Generator<T>
{
event Action<T> onPush;
public IObservable<T> Items =>
Observable.FromEvent<T>(d => onPush += d, d => onPush -= d);
public void Push(T item) => onPush?.Invoke(item);
}
However, I haven't found a way to complete the observable also by an event - how can I do that?
Update:
To clarify what I mean, the class above produces IObservable<T>
which is "endless" and never completes. I want to make it completed by another event, not to make another observable. So the question can be reduces to this:
How to make an arbitrary IObservable<T>
completed prematurely, i.e. the OnCompleted
notification to be called?
Upvotes: 0
Views: 662
Reputation: 14350
An observable represents a stream of notifications, or events. When an observable sources from an event, they are inherently endless. The observable connects to the event, referencing the object, so the object backing the event will never go out of scope. .NET/C# doesn't provide a way to indicate that an event will never be called again, so the observable directly connecting to the event is endless.
This is not uncommon; most event-based observables never have OnCompleted
called explicitly, modelling the real world where it is quite hard to say definitively that something will never happen again.
However, this isn't a problem: Observables are meant to run infinitely, and cause no damage. An unsubscribed observable doesn't take up much resources. If you're not interested in an event-sourced observable, unsubscribe all subscriptions and you're fine.
One way to do this is with one of the Take
operators, like the TakeUntil
operator (as mentioned below). Try the following code (using your Generator
class):
var g = new Generator<int>();
g.Items
.TakeUntil(i => i > 3)
.Subscribe(
i => Console.WriteLine($"OnNext: {i}"),
e => Console.WriteLine($"OnError: Message: {e.Message}"),
() => Console.WriteLine("OnCompleted")
);
g.Push(1);
g.Push(2);
g.Push(3);
g.Push(4);
g.Push(5);
g.Push(6);
Output:
OnNext: 1
OnNext: 2
OnNext: 3
OnNext: 4
OnCompleted
TakeUntil
unsubscribes from the Items
observable after there's a message with an integer larger than 3. This is why there's an OnCompleted, and no 5, 6 messages.
Also, as Enigmativity mentioned, your Generator<T>
class is basically the same as Subject<T>
, I suggest you use that.
Original answer:
Make another observable from the event, then use .TakeUntil
:
class Generator<T>
{
event Action<T> onPush;
event Action<Unit> onCompleted;
public IObservable<T> Items =>
Observable.FromEvent<T>(d => onPush += d, d => onPush -= d)
.TakeUntil(Completion);
public IObservable<Unit> Completion =>
Observable.FromEvent<Unit>(d => onCompleted += d, d => onCompleted -= d);
public void Push(T item) => onPush?.Invoke(item);
public void Complete() => onCompleted?.Invoke(Unit.Default);
}
Upvotes: 2