Homde
Homde

Reputation: 4286

Change interval of RX operators?

This might be a stupid question as I'm a bit new to RX :)

I'm sampling an event (RX for .Net 4.0):

eventAsObservable.Sample(TimeSpan.FromSeconds(1)).Timestamp().Subscribe(x =>Console.WriteLine("testing:" + x.Value.EventArgs.str));

The problem is that the sampling time needs to be able to change on the fly, I guess I could make some property that removes the existing handler and creates a new one when it changes, but it seems a bit messy and more vulnerable to timing issues. Is there a way to simply change the interval?

Example: Say that someone is writing a string of characters, when a certain sequence is detected you want to change the sampling time without missing an event, and preferably by not getting an event more than one time

Upvotes: 6

Views: 1961

Answers (4)

Igor Dvorkin
Igor Dvorkin

Reputation: 827

TL;DR: Create an Observable using ObservableFromIntervalFunctor, as is shown below:

void Main()
{
    // Pick an initial period, it can be changed later.
    var intervalPeriod = TimeSpan.FromSeconds(1);

    // Create an observable using a functor that captures the interval period.
    var o = ObservableFromIntervalFunctor(() => intervalPeriod);

    // Log every value so we can visualize the observable.
    o.Subscribe(Console.WriteLine);

    // Sleep for a while so you can observe the observable.
    Thread.Sleep(TimeSpan.FromSeconds(5.0));

    // Changing the interval period will takes effect on next tick.
    intervalPeriod = TimeSpan.FromSeconds(0.3);

}

IObservable<long> ObservableFromIntervalFunctor(Func<TimeSpan> intervalPeriodFunctor)
{
    return Observable.Generate(0L, s => true, s => s + 1, s => s, s => intervalPeriodFunctor());
}

Explanation: Observable.Generate has an overload allowing you to specify the time when the next value will be generated via a functor. By passing a functor which has captured a timespan variable, you can make the observable.interval period change by changing the captured timespan variable.

Linqpad snippet here

Upvotes: 2

Richard Szalay
Richard Szalay

Reputation: 84734

I know this question has already been answered, but I thought I'd add another few ways of tackling it in an Rx way.

You could use Switch on a sequence of TimeSpan's:

private Subject<TimeSpan> sampleFrequencies = new Subject<TimeSpan>();

sampleFrequencies
    .Select(x => eventAsObservable.Sample(Observable.Interval(x)).Timestamp())
    .Switch()
    .Subscribe(x => .WriteLine("testing:" + x.Value.EventArgs.str));

// To change:
// sampleFrequencies.OnNext(TimeSpan.FromSeconds(5));

Alternatively, it could also be solved using Defer, TakeUntil and Repeat (this one is a little crazier and is included as a thought exercise):

private TimeSpan sampleFrequency = TiemSpan.FromSeconds(2);
private Subject<Unit> frequencyChanged = new Subject<Unit>();

(Observable
    .Defer(() => eventAsObservable
       .Sample(Observable.Interval(sampleFrequency)
    )
    .Timestamp()
    .TakeUntil(frequencyChanged)
).Repeat()
.Subscribe(x => .WriteLine("testing:" + x.Value.EventArgs.str));

// To change: 
// sampleFrequency = TimeSpan.FromSeconds(5);
// frequencyChanged.OnNext(new Unit());

Upvotes: 6

Jon Skeet
Jon Skeet

Reputation: 1500055

I don't know of a way of changing the existing sampling interval, but what you could do is sample at the highest frequency you'll need, and then filter with a Where clause which uses a variable you can change.

For example:

static IObservable<T> SampleEvery<T>(this IObservable<T> source,
    Func<int> multipleProvider)
{
    int counter = 0;
    Func<T, bool> predicate = ignored => {
        counter++;
        if (counter >= multipleProvider())
        {
            counter = 0;
        }
        return counter == 0;
    };
    return source.Where(predicate);
}

You'd then call it like this:

// Keep this somewhere you can change it
int multiple = 1;

eventAsObservable.Sample(TimeSpan.FromSeconds(1))
                 .SampleEvery(() => multiple)
                 .Timestamp()
                 .Subscribe(x => Console.WriteLine("testing:" + 
                                                   x.Value.EventArgs.str));

Now, changing the value of multiple will change the effective sampling frequency.

It's a pretty ugly hack, but I think it should work.

Upvotes: 8

Ana Betts
Ana Betts

Reputation: 74654

Why don't you just subscribe twice?

Observable.Merge(
    eventAsObservable.Sample(TimeSpan.FromSeconds(1)).Timestamp().SelectMany(x => doLocalLookup(x)),
    eventAsObservable.Sample(TimeSpan.FromSeconds(10)).Timestamp().SelectMany(x => doRemoteLookup(x)),
).Subscribe(Console.WriteLine);

Or if searches are only active based on some sort of prefix or qualifier like Google Chrome's '?' operator:

Observable.Merge(
    eventAsObservable.Sample(TimeSpan.FromSeconds(1)).Where(x => isLocal(x)).SelectMany(x => doLocalLookup(x)),
    eventAsObservable.Sample(TimeSpan.FromSeconds(10)).Where(x => isARemoteQuery(x).SelectMany(x => doRemoteLookup(x)),
).Subscribe(Console.WriteLine);

Upvotes: 0

Related Questions