Reputation: 4909
Before I go write one I thought it would be worth asking: RX has a throttle extension method which discards events if they occur too quickly.
So if you asked it to throttle events to 1 per 5 seconds, if you received a event after 0.1second, then a second event 1 second later, you would get one event followed by silence.
What I want is for it to raise the first event after 0.1 seconds, but to then raise another event 4.9 seconds later.
Further, if I receive events at 0.1, 1 and 2 seconds, I want it to raise the event at 0.1 seconds, 5 seconds and then nothing, so I don't want it capturing n events and only releasing one per period for n periods.
Buffer does the opposite, in that it saves everything for 5 seconds and then raises the event, so what is neither throttle nor buffer, but something inbetween.
Is there a way to do this with the existing framework, or do I need to write one?
Upvotes: 2
Views: 950
Reputation: 10783
I think you will have to write your own operator, or do some toying around with Window
. Like the other comments, I am not 100% sure on your requirements, but I have tried to capture them in these tests.
using System;
using System.Reactive.Linq;
using Microsoft.Reactive.Testing;
using NUnit.Framework;
[TestFixture]
public class Throttle : ReactiveTest
{
private TestScheduler _testScheduler;
private ITestableObservable<int> _sourceSequence;
private ITestableObserver<int> _observer;
[SetUp]
public void SetUp()
{
var windowPeriod = TimeSpan.FromSeconds(5);
_testScheduler = new TestScheduler();
_sourceSequence = _testScheduler.CreateColdObservable(
//Question does the window start when the event starts, or at time 0?
OnNext(0.1.Seconds(), 1),
OnNext(1.0.Seconds(), 2),
OnNext(2.0.Seconds(), 3),
OnNext(7.0.Seconds(), 4),
OnCompleted<int>(100.0.Seconds())
);
_observer = _testScheduler.CreateObserver<int>();
_sourceSequence
.Window(windowPeriod, _testScheduler)
.SelectMany(window =>
window.Publish(
shared => shared.Take(1).Concat(shared.Skip(1).TakeLast(1))
)
)
.Subscribe(_observer);
_testScheduler.Start();
}
[Test]
public void Should_eagerly_publish_new_events()
{
Assert.AreEqual(OnNext(0.1.Seconds(), 1), _observer.Messages[0]);
}
[Test]
public void Should_publish_last_event_of_a_window()
{
//OnNext(1.0.Seconds(), 2) is ignored. As OnNext(5.0.Seconds(), 3) occurs after it, and before the end of a window, it is yeiled.
Assert.AreEqual(OnNext(5.0.Seconds(), 3), _observer.Messages[1]);
}
[Test]
public void Should_only_publish_event_once_if_it_is_the_only_event_for_the_window()
{
Assert.AreEqual(OnNext(7.0.Seconds(), 4), _observer.Messages[2]);
Assert.AreEqual(OnCompleted<int>(100.0.Seconds()), _observer.Messages[3]);
}
[Test]
public void AsOneTest()
{
var expected = new[]
{
OnNext(0.1.Seconds(), 1),
//OnNext(1.0.Seconds(), 2) is ignored. As OnNext(5.0.Seconds(), 3) occurs after it, and before the end of a window, it is yeiled.
OnNext(5.0.Seconds(), 3),
OnNext(7.0.Seconds(), 4),
OnCompleted<int>(100.0.Seconds())
};
CollectionAssert.AreEqual(expected, _observer.Messages);
}
}
Upvotes: 6