deafjeff
deafjeff

Reputation: 774

Rx Buffer running once and complete

I like to collect the first values of a stream for 3 seconds and connect them to another stream of the same datatype. The first Observable is a separate, finished data chunk which should complete in any case, so that I can connect first and second using Concat(). Using Concat() is required keeping the integrity of the data stream.

            private IObservable<3DPoints> _1stBuffer = Observable.Empty<3DPoints>();
            ..

           _1stBuffer = someRawStreamObservableReceivingOnNext
                .Buffer(TimeSpan.FromSeconds(3), 100)
                .Where(item => item.Any())
                .SelectMany(item => item);


// later 
            var streamObservable = _1stBuffer.Concat(_some2ndStream); // doesn't emit, since 1stBuffer doesn't complete

I tried this:

            _1stBuffer = someRawStreamObservableReceivingOnNext

                // completes the observable, but i want the buffer so far, no emptiness !
                .TimeOut(TimeSpan.FromSeconds(3), Observable.Empty<3DPoints>)

                .Buffer(TimeSpan.FromSeconds(3), 100)
                .Where(item => item.Any())
                .SelectMany(item => item);

The timeout makes the bufferObservable performing Concat(), but continuing Observable.Empty<3DPoints> is no option. How can I fetch the 1st buffer filled for 3 seconds, and Concat them ?

Upvotes: 0

Views: 152

Answers (1)

Shlomo
Shlomo

Reputation: 14350

You're probably looking for one of the Take variants:

  • source.Take(1) will take the first element from source, then complete.
  • source.TakeUntil(otherObservable) will take from source until the first element of otherObservable emits.

Upvotes: 1

Related Questions