Reputation: 1225
I'm starting the development with Reactive extensions (version 2.1, just in case) and for my sample application I need a sequence of int values pushed with some interval, i.e. every 1 second.
I know, that I can create a sequence with Observable.Range<int>(0,10)
but I can't figure out how to set relative time between pushes. I've tried Delay()
but shifts the sequence only once at the start.
I then found Observable.Generate()
method that could be adjusted to this task in next way:
var delayed = Observable.
Generate(0, i => i <= 10, i => i + 1, i => i,
i => TimeSpan.FromSeconds(1));
But that seems to be working only for simple 'for-each-like' defined sequences. So, in general, my question is, whether we can to get any source sequence and wrap it with some proxy that will pull messages from source and push it further with time delay?
S--d1--d2--d3--d4--d5-|
D--d1-delay-d2-delay-d3-delay-d4-delay-d5-|
P.S. If this approach contradicts to the concept of ReactiveExtensions, please also note this. I don't want to do it "by all means" and them get some other design problems in future.
P.P.S General Idea is to make sure that output sequence has a specified interval between events in spite of if the input sequence is finite or infinite and how often it pushes events.
Upvotes: 6
Views: 3394
Reputation: 9638
I know this an old question but I think I have the correct answer.
Zipping an Observable.Timer produces 'ticks' even if the source Observable is not producing anything. This means that once the source produces another item any ticks that were already produced, and not yet consumed, will be used. Resulting in an observable that will add a delay between items when the producer produces items at a steady rate, but which will produce bursts of items if the producer sometimes takes a longer time to produce an item.
To circumvent this you will need to generate a timer, that produces only one item, between every item that is produced by your observable. You can do this with Observable.Switch like this:
var subject = new Subject<Unit>();
var producer = subject.SelectMany(
_ =>
{
return new[]
{
Observable.Return(true),
Observable.Timer(TimeSpan.FromSeconds(2))
.Select(q => false)
};
})
.Switch();
Upvotes: 2
Reputation: 29786
Observable.Interval is what you want to look at. It will generate a 0 based long value incrementing by 1 every interval of time that you specify e.g.:
Observable.Interval(TimeSpan.FromSeconds(1)).Subscribe(x => Console.WriteLine(x));
You can then use a projection (Select
) to offset/change this value as needed.
You can also "pace" one stream with another by using the Zip operator - you may want to look at that too. Zip pairs events from two streams together, so it emits at the pace of the currently slowest stream. Zip is quite flexible too, it can zip any number of streams, or even zip an IObservable to an IEnumerable. Here's an example of that:
var pets = new List<string> { "Dog", "Cat", "Elephant" };
var pace = Observable.Interval(TimeSpan.FromSeconds(1))
.Zip(pets, (n, p) => p)
.Subscribe(x => Console.WriteLine(x), () => Console.WriteLine("Done"));
This writes out pets at an interval of 1 second.
In light of the P.P.S. added above, I will give another answer - I'll leave this for reference as it is a useful technique anyway.
Upvotes: 9
Reputation: 6008
Perhaps what you're looking for is the Buffer
extension method. It's signature is defined like this:
public static IObservable<IList<TSource>> Buffer<TSource>(
this IObservable<TSource> source,
TimeSpan timeSpan)
It will transform the source sequence in a way that the values are produced in batch as often as timeSpan
.
Upvotes: -1
Reputation: 117064
Here's the simplest approach to do what you want:
var delayed =
source.Do(x => Thread.Sleep(1000));
It adds a second delay, but it does do so before the first item. You could certainly wrap up some logic not to put a delay at the start. That wouldn't be too hard.
Here's an alternative that schedules the delay on a brand new trend.
var delayed =
Observable.Create<int>(o =>
{
var els = new EventLoopScheduler();
return source
.ObserveOn(els)
.Do(x => els.Schedule(() => Thread.Sleep(1000)))
.Subscribe(o);
});
Upvotes: 1
Reputation: 29786
So, to clarify, you want the output to push the input at a rate no faster than the interval, but otherwise as fast as possible.
In that case try this. The input
variable construction is a hacky way to create a short sporadic sequence that is sometimes faster and sometimes slower than a 2 second pace. Note the output from the stopwatch will show up the small inaccuracies in the timer mechanism Rx uses.
var input = Observable.Interval(TimeSpan.FromSeconds(1)).Take(4);
input = input.Concat(Observable.Interval(TimeSpan.FromSeconds(5)).Take(2));
var interval = TimeSpan.FromSeconds(2);
var paced = input.Select(i => Observable.Empty<long>()
.Delay(interval)
.StartWith(i)).Concat();
var stopwatch = new Stopwatch();
stopwatch.Start();
paced.Subscribe(
x => Console.WriteLine(x + " " + stopwatch.ElapsedMilliseconds),
() => Console.WriteLine("Done"));
This sample works by projecting each tick from the input into a sequence that has the tick as a single event at the start, but doesn't OnComplete for the desired interval. The resulting stream of streams is then concatenated. This approach ensures new ticks are emitted immediately if the output is currently "flushed", but buffered up behind each other otherwise.
You can wrap this up in an extension method to make it generic.
Upvotes: 4