Reputation: 4104
I want to:
IObservable<T>
, but immediately start buffering any T
that is received (i.e. not yet seen by my IObserver<T>
).IObserver<T>
and continue It is quite important that the subscription is the first thing that happens.
In a 'marble diagram' form I am after something like this...
Time T+1 2 3 4 5 6 7 8
s1:IObservable<int> 1 2 3 4 5 6 7 8
s2:IObservable<bool> t
r: IObservable<int> 1 3 4 5 6 7 8
2
... in that at T+1 I subscribe to an IObservable<bool>
r
that itself is dependent upon IObservable<int>
s1
and IObservable<bool>
s2
. s1
is a stream that I don't control, s2
is one that I do control (a Subject), and publish
on when the work is done.
I thought that SkipUntil
would help me out, but that doesn't buffer the events that are received before the dependent IObservable
has completed.
Here's some code that I thought would work, but doesn't due to SkipUntil
not being a buffer.
var are = new AutoResetEvent(false);
var events = Observable.Generate(1, i => i < 12, i => i + 1, i => i, i => TimeSpan.FromSeconds(1));
events.Subscribe(x => Console.WriteLine("events:" + x), () => are.Set());
var subject = new Subject<int>();
var completed = subject.AsObservable().Delay(TimeSpan.FromSeconds(5));
Console.WriteLine("Subscribing to events...");
events.SkipUntil(completed).Subscribe(x=> Console.WriteLine("events.SkipUntil(completed):"+ x));
Console.WriteLine("Subscribed.");
completed.Subscribe(x => Console.WriteLine("Completed"));
subject.OnNext(10);
are.WaitOne();
Console.WriteLine("Done");
I know about the various Buffer
methods but they don't seem appropriate in this case as I am not really buffering here, just coordinating activity at the start of my subscriptions.
UPDATE
I have generalised Enigmativity's response into the following extension method that might be useful:
public static class ObservableEx
{
public static IObservable<TSource> BufferUntil<TSource, TCompleted>(this IObservable<TSource> source, IObservable<TCompleted> completed)
{
var observable = Observable.Create<TSource>(o =>
{
var replaySubject = new ReplaySubject<TSource>();
var sub1 = source.Subscribe(replaySubject);
var query =
completed.Take(1).Select(
x => replaySubject.AsObservable());
var sub2 = query.Switch().Subscribe(o);
return new CompositeDisposable(sub1, sub2);
});
return observable;
}
}
Upvotes: 2
Views: 876
Reputation: 117064
This works for me:
var r = Observable.Create<int>(o =>
{
var rs = new ReplaySubject<int>();
var subscription1 = s1.Subscribe(rs);
var query = from f in s2.Take(1) select rs.AsObservable();
var subscription2 = query.Switch().Subscribe(o);
return new CompositeDisposable(subscription1, subscription2);
});
Upvotes: 4