Reputation: 1028
I'm writing a custom RX operator that combines features from both Throttle and Delay, with the following signature
public static IObservable<T> DelayWhen(this IObservable<T> self, TimeSpan delay, Func<T, bool> condition);
The rules are as follows:
condition(t)
returns false
, emit immediately.condition(t)
returns true
, delay for delay
time.self
emits a value during a delay then do the following:
condition(t)
returns false
, cancel/skip the value scheduled for delayed emission and emit the new valuecondition(t)
returns true
then skip/ignore this new value (i.e. the delayed value will emit if self
does not emit any more values during the delay).As you can tell from the rules, there is some behavior reminiscent of throttling going on here.
My various attempts at solving this issue include some async
approaches that just grew to complex. I really feel this should be solvable using existing operators. E.g. see https://stackoverflow.com/a/16290788/2149075, which uses Amb
quite neatly and which I feel is really close to what I want to achieve.
Upvotes: 0
Views: 309
Reputation: 43429
Here is an implementation of the DelayWhen
operator, that is based on the built-in Window
operator:
Update: My original implementation (Revision 1) did not satisfy the requirements of the question, so I changed it by replacing the Delay
operator with a custom-made delaying/throttling operator.
/// <summary>
/// Either delays the emission of the elements that satisfy the condition, by the
/// specified time duration, or ignores them, in case they are produced before
/// the emission of previously delayed element. Elements that don't satisfy the
/// condition are emitted immediately, and they also cancel any pending emission of
/// all previously delayed elements.
/// </summary>
public static IObservable<T> DelayWhen<T>(this IObservable<T> source,
TimeSpan delay, Func<T, bool> condition, IScheduler scheduler = null)
{
// Arguments validation omitted
scheduler ??= DefaultScheduler.Instance;
return source
.Select(x => (Item: x, WithDelay: condition(x)))
.Publish(published => published.Window(published.Where(e => !e.WithDelay)))
.Select(w => Observable.Merge(
DelayThrottleSpecial(w.Where(e => e.WithDelay), delay, scheduler),
w.Where(e => !e.WithDelay)
))
.Switch()
.Select(e => e.Item);
/// <summary>
/// Time shifts the observable sequence by the specified time duration, ignoring
/// elements that are produced while a previous element is scheduled for emission.
/// </summary>
static IObservable<T2> DelayThrottleSpecial<T2>(IObservable<T2> source,
TimeSpan dueTime, IScheduler scheduler)
{
int mutex = 0; // 0: not acquired, 1: acquired
return source.SelectMany(x =>
{
if (Interlocked.CompareExchange(ref mutex, 1, 0) == 0)
return Observable.Return(x)
.DelaySubscription(dueTime, scheduler)
.Finally(() => Volatile.Write(ref mutex, 0));
return Observable.Empty<T2>();
});
}
}
The source sequence is partitioned in consecutive windows (subsequences), with each window ending with a false
(non delayed) element. Each window is then projected to a new window that has its true
(delayed) elements delayed/throttled according to the requirements. Finally the projected windows are merged back to a single sequence by using the Switch
operator, so that all pending elements of a window are discarded every time a new window is emitted.
Upvotes: 2
Reputation: 14350
The question isn't completely clear, so using the following test case as a scenario:
Observable.Interval(TimeSpan.FromSeconds(1))
.Take(10)
.DelayWhen(TimeSpan.FromSeconds(1.5), i => i % 3 == 0 || i % 2 == 0)
This should result in the following:
// T: ---1---2---3---4---5---6---7---8---9---0---1----
// original: ---0---1---2---3---4---5---6---7---8---9
// delay?: ---Y---N---Y---Y---Y---N---Y---N---Y---Y
// expected: -------1---------2-----5-------7-------------8
//
// 0: Delayed, but interrupted by 1,
// 1: Non-delayed, emit immediately
// 2: Delayed, emit after 1.5 seconds
// 3: Delayed, since emitted during a delay, ignored
// 4: Delayed, but interrupted by 5.
// 5: Non-delayed, emit immediately
// 6: Delayed, but interrupted by 7.
// 7: Non-delayed, emit immediately
// 8: Delayed, but interrupted by 9
// 9: Delayed, since emitted during a delay, ignored
If that doesn't line up with the requirements, please clarify the question. @Theodore's solution gets the timing right, but emits 3 and 9, ignoring the "cancel/skip the value scheduled for delayed emission and emit the new value" clause.
This is functionally equivalent to Theodore's code, but (IMO) easier to work with and understand:
public static IObservable<T> DelayWhen2<T>(this IObservable<T> source, TimeSpan delay, Func<T, bool> condition, IScheduler scheduler)
{
return source
.Select(x => (Item: x, WithDelay: condition(x)))
.Publish(published => published
.SelectMany(t => t.WithDelay
? Observable.Return(t)
.Delay(delay, scheduler)
.TakeUntil(published.Where(t2 => !t2.WithDelay))
: Observable.Return(t)
)
)
.Select(e => e.Item);
}
From there, I had to embed the state of whether or not you're in delay with .Scan
:
public static IObservable<T> DelayWhen3<T>(this IObservable<T> source, TimeSpan delay, Func<T, bool> condition)
{
return DelayWhen3(source, delay, condition, Scheduler.Default);
}
public static IObservable<T> DelayWhen3<T>(this IObservable<T> source, TimeSpan delay, Func<T, bool> condition, IScheduler scheduler)
{
return source
.Select(x => (Item: x, WithDelay: condition(x)))
.Publish(published => published
.Timestamp(scheduler)
.Scan((delayOverTime: DateTimeOffset.MinValue, output: Observable.Empty<T>()), (state, t) => {
if(!t.Value.WithDelay)
//value isn't delayed, current delay status irrelevant, emit immediately, and cancel previous delay.
return (DateTimeOffset.MinValue, Observable.Return(t.Value.Item));
else
if (state.delayOverTime > t.Timestamp)
//value should be delayed, but current delay already in progress. Ignore value.
return (state.delayOverTime, Observable.Empty<T>());
else
//value should be delayed, no delay in progress. Set delay state, and return delayed observable.
return (t.Timestamp + delay, Observable.Return(t.Value.Item).Delay(delay, scheduler).TakeUntil(published.Where(t2 => !t2.WithDelay)));
})
)
.SelectMany(t => t.output);
}
In the .Scan
operator, you embed the time when the previous Delay
expires. That way you know can handle a value that should be delayed within an existing delay. I added scheduler
parameters to the time-sensitive functions to enable testing:
var ts = new TestScheduler();
var target = Observable.Interval(TimeSpan.FromSeconds(1), ts)
.Take(10)
.DelayWhen3(TimeSpan.FromSeconds(1.5), i => i % 3 == 0 || i % 2 == 0, ts);
var observer = ts.CreateObserver<long>();
target.Subscribe(observer);
ts.Start();
var expected = new List<Recorded<Notification<long>>> {
new Recorded<Notification<long>>(2000.MsTicks(), Notification.CreateOnNext<long>(1)),
new Recorded<Notification<long>>(4500.MsTicks(), Notification.CreateOnNext<long>(2)),
new Recorded<Notification<long>>(6000.MsTicks(), Notification.CreateOnNext<long>(5)),
new Recorded<Notification<long>>(8000.MsTicks(), Notification.CreateOnNext<long>(7)),
new Recorded<Notification<long>>(10500.MsTicks(), Notification.CreateOnNext<long>(8)),
new Recorded<Notification<long>>(10500.MsTicks() + 1, Notification.CreateOnCompleted<long>()),
};
ReactiveAssert.AreElementsEqual(expected, observer.Messages);
And code for MsTicks:
public static long MsTicks(this int i)
{
return TimeSpan.FromMilliseconds(i).Ticks;
}
Upvotes: 3