Colonel Panic
Colonel Panic

Reputation: 137584

How to 'delay' observable sequence by one value?

In Reactive Extensions, how can I 'delay' an observable sequence by one value? For example:

original: 2 3 5 7 9   
delayed:    2 3 5 7

To be clear, I want to delay the sequence by one step. This isn't the same as delaying by a constant time.

Upvotes: 0

Views: 166

Answers (3)

James World
James World

Reputation: 29786

Just realized there's an even neater shorter approach:

var delayedByOne = source.Publish(ps => ps.Delay(_ => ps));

Upvotes: 3

James World
James World

Reputation: 29786

No publish required, but it needs to be in a method to get nice type inference like so:

public static class ObservableExtensions
{
    public static IObservable<T> DelayByOne<T>(
        this IObservable<T> source)
    {
        return source.Scan(
            Tuple.Create(default(T), default(T)),
                (a, i) => Tuple.Create(i,a.Item1))
            .Select(a => a.Item2).Skip(1);
    }
}

This idea originated from this http://www.zerobugbuild.com/?p=213#comment-884. You can see the above is really a specialization of more general comparison function. From the comments to my original blog post, here's the revised utility function for comparing current and prior events:

// The result selector is passed (curr, prev) and
// prev is default(T) for first event
public static class ObservableExtensions
{
    public static IObservable<TResult> CombineWithPrevious<TSource, TResult>(
        this IObservable<TSource> source,
        Func<TSource,TSource,TResult> resultSelector)
    {
        return source.Scan(
            Tuple.Create(default(TSource), default(TSource)),
                (a, i) => Tuple.Create(i,a.Item1))
            .Select(a => resultSelector(a.Item1, a.Item2));
    }
}

So you could use the above like so:

var delayedByOne = source.CombineWithPrevious((x,y) => y).Skip(1);

Upvotes: 3

Enigmativity
Enigmativity

Reputation: 117064

Try this:

var delayedByOne = source.Zip(source.Skip(1), (x, _) => x);

But you may need to do this to avoid running two concurrent sources if you have a cold observable source:

var delayedByOne = source.Publish(s => s.Zip(s.Skip(1), (x, _) => x));

Upvotes: 6

Related Questions