brolly87
brolly87

Reputation: 21

Rx .NET - force buffer to emit

I want to use Rx Buffer functionality:

var source = new Subject<Price>();
var buffer = source
    .Buffer(TimeSpan.FromSeconds(30), 5)
    .Where(p => p.Any());

that means emit (publishing to subscribers) happens when buffer reaches size of 5 or 30 seconds have gone since the last emit.

But I need to be able to emit on demand - for example when I receive high priority sequence item. Then I want to add it to observable (source.OnNext()) and somehow force it to emit (that means returning all elements in the buffer and clearing it).

I know that I can add following code:

var flusher = new Subject<Price>();
var closing = flusher.Select(x => new List<Price> {x});
var query = buffer.Merge(closing).Subscribe(something);

and invoke flusher.OnNext(highPriorityItem) and I will have it emitted.

But in this case, I have two independent sequences with two different emits. I need one emit when buffer is full or specific item appears in sequence.

Force flush count-type Observable.Buffer c# and Force flush to Observable.Buffer c# don't seem to be suitable for me

Upvotes: 2

Views: 1969

Answers (2)

Enigmativity
Enigmativity

Reputation: 117029

I think decPL has the basic idea right here, but his solution isn't stable. Depending on the scheduler of the input observable you can get unpredictable results even if it's subscribed in the right order. That's because there are multiple independent subscriptions to input. You need to push this all through a .Publish(...) call to ensure only one subscription.

Also it need a way of cleaning up when the subscription is disposed. So it also needs to run through a .Create(...) call.

Here's how:

var input = new Subject<Price>();

IObservable<IList<Price>> query =
    input
        .Publish(i =>
            Observable
                .Create<IList<Price>>(o =>
                {
                    var timeBuffer =
                        Observable
                            .Timer(TimeSpan.FromSeconds(10.0))
                            .Select(n => Unit.Default);
                    var flush =
                        i
                            .Where(p => p.IS_IMPORTANT)
                            .Select(n => Unit.Default);
                    var sizeBuffer =
                        i
                            .Buffer(5)
                            .Select(l => Unit.Default);
                    return
                        i
                            .Window(() => Observable.Merge(timeBuffer, sizeBuffer, flush))
                            .SelectMany(w => w.ToList())
                            .Subscribe(o);
                }));

query.Subscribe(w => DO_SOMETHING_WITH_PRICES(w));

Upvotes: 6

decPL
decPL

Reputation: 5402

EDIT: @Enigmativity is absolutely correct, refer to his answer. Leaving this one intact, as hopefully it's a bit easier to determine the thought process here.

Try something as follows:

var input = new Subject<Price>(); //your input observable

var flush = new Subject<long>(); //used to manually flush the 'buffer' for important prices
var timeBuffer
   = Observable.Timer(TimeSpan.FromSeconds(10)); //controls the time-based part of 'buffer'
var sizeBuffer = input.Buffer(5).Select(l => 0L); //controls the size-based part of 'buffer'

var bufferedInput = input.Window(()=>Observable.Merge(timeBuffer, sizeBuffer, flush))
                         .SelectMany(w => w.ToList())
                         .Subscribe(w => DO_SOMETHING_WITH_PRICES(w));

//Flush on important price (NOTE - order of the two subscriptions matter)
input.Where(p => p.IS_IMPORTANT).Subscribe(p => flush.OnNext(0L));

Upvotes: 3

Related Questions