user4856537
user4856537

Reputation:

Drain IObservable from the top

I would like to consume an IObservable which can be filled at any time.

I have this extension method:

public static IObservable<TOut> Drain<TSource, TOut>(this IObservable<TSource> source,
        Func<TSource, IObservable<TOut>> selector)
{
    return Observable.Defer(() =>
    {
        BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());

        return source
            .Zip(queue, (v, q) => v)
            .SelectMany(v => selector(v)
                .Do(_ =>
                {

                }, () =>
                {
                    queue.OnNext(new Unit());
                })
            );
    });
}

And I use as like below:

_moviesToTranslateObservable = new Subject<IMovie>();
_moviesToTranslateObservable.Drain(s => Observable.Return(s).Delay(TimeSpan.FromMilliseconds(250)))
  .Subscribe(async movieToTranslate =>
      {
      }

As soon as a new item is pushed :

_moviesToTranslateObservable.OnNext(movieToTranslate);

The IObservable is consumed.

My issue is that, when I add a lot of items, I would like to consume not the first one which has been added, but the last added (like a stack, not a queue).

How can I achieve this? Is BehaviorSubject appropriate for a stack consuming behavior?

Upvotes: 2

Views: 185

Answers (1)

Shlomo
Shlomo

Reputation: 14350

I know the variable name says queue, but that BehaviorSubject isn't really a queue, it's more like a lock. The queueing really happens inside the Zip function, which carries an internal queue.

As far as switching between FIFO and LIFO, I'm not sure what criteria you want, but here's a FIFO version of Drain.

public static IObservable<TOut> DrainReverse<TSource, TOut>(this IObservable<TSource> source,
        Func<TSource, IObservable<TOut>> selector)
{
    return Observable.Defer(() =>
    {
        BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());
        var stack = new Stack<TSource>();

        return source
            .Do(item => stack.Push(item))
            .Zip(queue, (v, q) => v)
            .Select(_ => stack.Pop())
            .SelectMany(v => selector(v)
                .Do(_ =>
                {

                }, () =>
                {
                    queue.OnNext(new Unit());
                })
            );
    });
}

When used with the following running code:

var s = new Subject<int>();
var d = s.DrainReverse(i => Observable.Return(i).Delay(TimeSpan.FromMilliseconds(250)));
d.Subscribe(i => Console.WriteLine(i));
s.OnNext(0);
s.OnNext(1);
s.OnNext(2);
s.OnNext(3);
s.OnNext(4);
s.OnNext(5);

Which correctly yields 0, 5, 4, 3, 2, 1

Upvotes: 1

Related Questions