JustinM
JustinM

Reputation: 1013

How to buffer items when another observable emits true, and release them on false

I have a source stream and usually want to emit items as they arrive. But there is another observable - let's call it the "gate". When the gate is closed, the source items should buffer and be released only when the gate is opened.

I've been able to write a function to do this but it seems more complicated than it needs to be. I had to use the Observable.Create method. I assume there is a way to accomplish my goal using just a few lines of more functional code using the Delay or Buffer methods but I can't figure out how. Delay seems especially promising but I can't figure out how to sometimes delay and sometimes allow everything through immediately (a zero delay). Likewise I thought I could use Buffer followed by SelectMany; when the gate is open I'd have buffers of length 1 and when the gate is closed I'd have longer ones, but again I couldn't figure out how to make it work.

Here is what I've built that works with all my tests:

/// <summary>
/// Returns every item in <paramref name="source"/> in the order it was emitted, but starts
/// caching/buffering items when <paramref name="delay"/> emits true, and releases them when
/// <paramref name="delay"/> emits false.
/// </summary>
/// <param name="delay">
/// Functions as "gate" to start and stop the emitting of items. The gate is opened when true
/// and closed when false. The gate is open by default.
/// </param>

public static IObservable<T> DelayWhile<T>(this IObservable<T> source, IObservable<bool> delay) =>
    Observable.Create<T>(obs =>
    {
        ImmutableList<T> buffer = ImmutableList<T>.Empty;
        bool isDelayed = false;
        var conditionSubscription =
            delay
            .DistinctUntilChanged()
            .Subscribe(i =>
            {
                isDelayed = i;
                if (isDelayed == false)
                {
                    foreach (var j in buffer)
                    {
                        obs.OnNext(j);
                    }
                    buffer = ImmutableList<T>.Empty;
                }
            });
        var sourceSubscription =
            source
            .Subscribe(i =>
            {
                if (isDelayed)
                {
                    buffer = buffer.Add(i);
                }
                else
                {
                    obs.OnNext(i);
                }
            });
        return new CompositeDisposable(sourceSubscription, conditionSubscription);
    });

Here is another option that passes the tests. It is pretty concise but does not use the Delay or Buffer methods; I need to do the delaying/buffering manually.

public static IObservable<T> DelayWhile<T>(this IObservable<T> source, IObservable<bool> delay) =>
    delay
    .StartWith(false)
    .DistinctUntilChanged()
    .CombineLatest(source, (d, i) => new { IsDelayed = d, Item = i })
    .Scan(
        seed: new { Items = ImmutableList<T>.Empty, IsDelayed = false },
        accumulator: (sum, next) => new
        {
            Items = (next.IsDelayed != sum.IsDelayed) ?
                    (next.IsDelayed ? sum.Items.Clear() : sum.Items) :
                    (sum.IsDelayed ? sum.Items.Add(next.Item) : sum.Items.Clear().Add(next.Item)),
            IsDelayed = next.IsDelayed
        })
    .Where(i => !i.IsDelayed)
    .SelectMany(i => i.Items);

These are my tests:

[DataTestMethod]
[DataRow("3-a 6-b 9-c", "1-f", "3-a 6-b 9-c", DisplayName = "Start with explicit no_delay, emit all future items")]
[DataRow("3-a 6-b 9-c", "1-f 2-f", "3-a 6-b 9-c", DisplayName = "Start with explicit no_delay+no_delay, emit all future items")]
[DataRow("3-a 6-b 9-c", "1-t", "", DisplayName = "Start with explicit delay, emit nothing")]
[DataRow("3-a 6-b 9-c", "1-t 2-t", "", DisplayName = "Start with explicit delay+delay, emit nothing")]
[DataRow("3-a 6-b 9-c", "5-t 10-f", "3-a 10-b 10-c", DisplayName = "When delay is removed, all cached items are emitted in order")]
[DataRow("3-a 6-b 9-c 12-d", "5-t 10-f", "3-a 10-b 10-c 12-d", DisplayName = "When delay is removed, all cached items are emitted in order")]
public void DelayWhile(string source, string isDelayed, string expectedOutput)
{
    (long time, string value) ParseEvent(string e)
    {
        var parts = e.Split('-');
        long time = long.Parse(parts[0]);
        string val = parts[1];
        return (time, val);
    }
    IEnumerable<(long time, string value)> ParseEvents(string s) => s.Split(new char[] { ' ' }, StringSplitOptions.RemoveEmptyEntries).Select(ParseEvent);
    var scheduler = new TestScheduler();
    var sourceEvents = ParseEvents(source).Select(i => OnNext(i.time, i.value)).ToArray();
    var sourceStream = scheduler.CreateHotObservable(sourceEvents);
    var isDelayedEvents = ParseEvents(isDelayed).Select(i => OnNext(i.time, i.value == "t")).ToArray();
    var isDelayedStream = scheduler.CreateHotObservable(isDelayedEvents);
    var expected = ParseEvents(expectedOutput).Select(i => OnNext(i.time, i.value)).ToArray();
    var obs = scheduler.CreateObserver<string>();
    var result = sourceStream.DelayWhile(isDelayedStream);
    result.Subscribe(obs);
    scheduler.AdvanceTo(long.MaxValue);
    ReactiveAssert.AreElementsEqual(expected, obs.Messages);
}

[TestMethod]
public void DelayWhile_SubscribeToSourceObservablesOnlyOnce()
{
    var scheduler = new TestScheduler();
    var source = scheduler.CreateHotObservable<int>();
    var delay = scheduler.CreateHotObservable<bool>();

    // No subscriptions until subscribe
    var result = source.DelayWhile(delay);
    Assert.AreEqual(0, source.ActiveSubscriptions());
    Assert.AreEqual(0, delay.ActiveSubscriptions());

    // Subscribe once to each
    var obs = scheduler.CreateObserver<int>();
    var sub = result.Subscribe(obs);
    Assert.AreEqual(1, source.ActiveSubscriptions());
    Assert.AreEqual(1, delay.ActiveSubscriptions());

    // Dispose subscriptions when subscription is disposed
    sub.Dispose();
    Assert.AreEqual(0, source.ActiveSubscriptions());
    Assert.AreEqual(0, delay.ActiveSubscriptions());
}

[TestMethod]
public void DelayWhile_WhenSubscribeWithNoDelay_EmitCurrentValue()
{
    var source = new BehaviorSubject<int>(1);
    var emittedValues = new List<int>();
    source.DelayWhile(Observable.Return(false)).Subscribe(i => emittedValues.Add(i));
    Assert.AreEqual(1, emittedValues.Single());
}

// Subscription timing issue?
[TestMethod]
public void DelayWhile_WhenSubscribeWithDelay_EmitNothing()
{
    var source = new BehaviorSubject<int>(1);
    var emittedValues = new List<int>();
    source.DelayWhile(Observable.Return(true)).Subscribe(i => emittedValues.Add(i));
    Assert.AreEqual(0, emittedValues.Count);
}

[TestMethod]
public void DelayWhile_CoreScenario()
{
    var source = new BehaviorSubject<int>(1);
    var delay = new BehaviorSubject<bool>(false);
    var emittedValues = new List<int>();

    // Since no delay when subscribing, emit value
    source.DelayWhile(delay).Subscribe(i => emittedValues.Add(i));
    Assert.AreEqual(1, emittedValues.Single());

    // Turn on delay and buffer up a few; nothing emitted
    delay.OnNext(true);
    source.OnNext(2);
    source.OnNext(3);
    Assert.AreEqual(1, emittedValues.Single());

    // Turn off delay; should release the buffered items
    delay.OnNext(false);
    Assert.IsTrue(emittedValues.SequenceEqual(new int[] { 1, 2, 3 }));
}

Upvotes: 3

Views: 142

Answers (2)

JustinM
JustinM

Reputation: 1013

A proposed concise answer. It looks like it should work but it doesn't pass all the tests.

public static IObservable<T> DelayWhile<T>(this IObservable<T> source, IObservable<bool> delay)
{
    source = source.Publish().RefCount();
    delay = delay.Publish().RefCount();
    var delayRemoved = delay.Where(i => i == false);
    var sourceWhenNoDelay = source.WithLatestFrom(delay.StartWith(false), (s, d) => d).Where(i => !i);
    return
        source
        .Buffer(bufferClosingSelector: () => delayRemoved.Merge(sourceWhenNoDelay))
        .SelectMany(i => i);
}

Upvotes: 0

Shlomo
Shlomo

Reputation: 14350

EDIT: I forgot about the problems you'll run into with Join and Join-based operators (like WithLatestFrom) when having two cold observables. Needless to say, that criticism mentioned below about lack of transactions is more apparent than ever.

I would recommend this, which is more like my original solution but using the Delay overload. It passes all tests except DelayWhile_WhenSubscribeWithDelay_EmitNothing. To get around that, I would create an overload that would accept a starting default value:

public static IObservable<T> DelayWhile<T>(this IObservable<T> source, IObservable<bool> delay, bool isGateClosedToStart)
{
    return source.Publish(_source => delay
        .DistinctUntilChanged()
        .StartWith(isGateClosedToStart)
        .Publish(_delay => _delay
            .Select(isGateClosed => isGateClosed
                ? _source.TakeUntil(_delay).Delay(_ => _delay)
                : _source.TakeUntil(_delay)
            )
            .Merge()
        )
    );
}

public static IObservable<T> DelayWhile<T>(this IObservable<T> source, IObservable<bool> delay)
{
    return DelayWhile(source, delay, false);
}

Old answer:

I read a book recently criticizing Rx for not supporting transactions, and my first try at solving this would be a great example why:

public static IObservable<T> DelayWhile<T>(this IObservable<T> source, IObservable<bool> delay)
{
    return source.Publish(_source => delay
        .DistinctUntilChanged()
        .StartWith(false)
        .Publish(_delay => _delay
            .Select(isGateClosed => isGateClosed 
                ? _source.Buffer(_delay).SelectMany(l => l) 
                : _source)
            .Switch()
        )
    );
}

That should work, except there's too many things relying on the delay observable, and the subscription order matters: In this case the Switch switches before the Buffer ends, so nothing ends up coming out when the delay gate is closed.

This can be fixed as follows:

public static IObservable<T> DelayWhile<T>(this IObservable<T> source, IObservable<bool> delay)
{
    return source.Publish(_source => delay
        .DistinctUntilChanged()
        .StartWith(false)
        .Publish(_delay => _delay
            .Select(isGateClosed => isGateClosed 
                ? _source.TakeUntil(_delay).Buffer(_delay).SelectMany(l => l) 
                : _source.TakeUntil(_delay)
            )
            .Merge()
        )
    );
}

My next try passed all your tests, and uses your desired Observable.Delay overload as well:

public static IObservable<T> DelayWhile<T>(this IObservable<T> source, IObservable<bool> delay)
{
    return delay
        .DistinctUntilChanged()
        .StartWith(false)
        .Publish(_delay => source
            .Join(_delay,
                s => Observable.Empty<Unit>(),
                d => _delay,
                (item, isGateClosed) => isGateClosed 
                    ? Observable.Return(item).Delay(, _ => _delay) 
                    : Observable.Return(item)
            )
            .Merge()
    );
}

The Join could be reduced to a WithLatestFrom like this:

public static IObservable<T> DelayWhile<T>(this IObservable<T> source, IObservable<bool> delay)
{
    return delay
        .DistinctUntilChanged()
        .StartWith(false)
        .Publish(_delay => source
            .WithLatestFrom(_delay,
                (item, isGateClosed) => isGateClosed 
                    ? Observable.Return(item).Delay(_ => _delay) 
                    : Observable.Return(item)
            )
            .Merge()
    );
}

Upvotes: 1

Related Questions