James Webster
James Webster

Reputation: 4104

How to subscribe to, but buffer data from, an IObservable until another IObservable has published?

I want to:

  1. Immediately subscribe to an IObservable<T>, but immediately start buffering any T that is received (i.e. not yet seen by my IObserver<T>).
  2. Do some work.
  3. When the work is completed, flush the buffer to my IObserver<T> and continue

It is quite important that the subscription is the first thing that happens.

In a 'marble diagram' form I am after something like this...

Time                  T+1   2   3   4   5   6   7   8
s1:IObservable<int>     1   2   3   4   5   6   7   8  
s2:IObservable<bool>          t    
r: IObservable<int>           1 3   4   5   6   7   8
                              2

... in that at T+1 I subscribe to an IObservable<bool> r that itself is dependent upon IObservable<int> s1 and IObservable<bool> s2. s1 is a stream that I don't control, s2 is one that I do control (a Subject), and publish on when the work is done.

I thought that SkipUntil would help me out, but that doesn't buffer the events that are received before the dependent IObservable has completed.

Here's some code that I thought would work, but doesn't due to SkipUntil not being a buffer.

        var are = new AutoResetEvent(false);
        var events = Observable.Generate(1, i => i < 12, i => i + 1, i => i, i => TimeSpan.FromSeconds(1));

        events.Subscribe(x => Console.WriteLine("events:" + x), () => are.Set());

        var subject = new Subject<int>();
        var completed = subject.AsObservable().Delay(TimeSpan.FromSeconds(5));

        Console.WriteLine("Subscribing to events...");

        events.SkipUntil(completed).Subscribe(x=> Console.WriteLine("events.SkipUntil(completed):"+ x));
        Console.WriteLine("Subscribed.");

        completed.Subscribe(x => Console.WriteLine("Completed"));

        subject.OnNext(10);

        are.WaitOne();
        Console.WriteLine("Done");

I know about the various Buffer methods but they don't seem appropriate in this case as I am not really buffering here, just coordinating activity at the start of my subscriptions.

UPDATE

I have generalised Enigmativity's response into the following extension method that might be useful:

public static class ObservableEx
{
    public static IObservable<TSource> BufferUntil<TSource, TCompleted>(this IObservable<TSource> source, IObservable<TCompleted> completed)
    {
        var observable = Observable.Create<TSource>(o =>
        {
            var replaySubject = new ReplaySubject<TSource>();
            var sub1 = source.Subscribe(replaySubject);
            var query =
                completed.Take(1).Select(
                    x => replaySubject.AsObservable());
            var sub2 = query.Switch().Subscribe(o);
            return new CompositeDisposable(sub1, sub2);
        });
        return observable;
    }        
}

Upvotes: 2

Views: 876

Answers (1)

Enigmativity
Enigmativity

Reputation: 117064

This works for me:

var r = Observable.Create<int>(o =>
{
    var rs = new ReplaySubject<int>();
    var subscription1 = s1.Subscribe(rs);
    var query = from f in s2.Take(1) select rs.AsObservable();
    var subscription2 = query.Switch().Subscribe(o);
    return new CompositeDisposable(subscription1, subscription2);
});

Upvotes: 4

Related Questions