Reputation: 4286
This might be a stupid question as I'm a bit new to RX :)
I'm sampling an event (RX for .Net 4.0):
eventAsObservable.Sample(TimeSpan.FromSeconds(1)).Timestamp().Subscribe(x =>Console.WriteLine("testing:" + x.Value.EventArgs.str));
The problem is that the sampling time needs to be able to change on the fly, I guess I could make some property that removes the existing handler and creates a new one when it changes, but it seems a bit messy and more vulnerable to timing issues. Is there a way to simply change the interval?
Example: Say that someone is writing a string of characters, when a certain sequence is detected you want to change the sampling time without missing an event, and preferably by not getting an event more than one time
Upvotes: 6
Views: 1961
Reputation: 827
TL;DR: Create an Observable using ObservableFromIntervalFunctor, as is shown below:
void Main()
{
// Pick an initial period, it can be changed later.
var intervalPeriod = TimeSpan.FromSeconds(1);
// Create an observable using a functor that captures the interval period.
var o = ObservableFromIntervalFunctor(() => intervalPeriod);
// Log every value so we can visualize the observable.
o.Subscribe(Console.WriteLine);
// Sleep for a while so you can observe the observable.
Thread.Sleep(TimeSpan.FromSeconds(5.0));
// Changing the interval period will takes effect on next tick.
intervalPeriod = TimeSpan.FromSeconds(0.3);
}
IObservable<long> ObservableFromIntervalFunctor(Func<TimeSpan> intervalPeriodFunctor)
{
return Observable.Generate(0L, s => true, s => s + 1, s => s, s => intervalPeriodFunctor());
}
Explanation: Observable.Generate has an overload allowing you to specify the time when the next value will be generated via a functor. By passing a functor which has captured a timespan variable, you can make the observable.interval period change by changing the captured timespan variable.
Linqpad snippet here
Upvotes: 2
Reputation: 84734
I know this question has already been answered, but I thought I'd add another few ways of tackling it in an Rx way.
You could use Switch
on a sequence of TimeSpan
's:
private Subject<TimeSpan> sampleFrequencies = new Subject<TimeSpan>();
sampleFrequencies
.Select(x => eventAsObservable.Sample(Observable.Interval(x)).Timestamp())
.Switch()
.Subscribe(x => .WriteLine("testing:" + x.Value.EventArgs.str));
// To change:
// sampleFrequencies.OnNext(TimeSpan.FromSeconds(5));
Alternatively, it could also be solved using Defer
, TakeUntil
and Repeat
(this one is a little crazier and is included as a thought exercise):
private TimeSpan sampleFrequency = TiemSpan.FromSeconds(2);
private Subject<Unit> frequencyChanged = new Subject<Unit>();
(Observable
.Defer(() => eventAsObservable
.Sample(Observable.Interval(sampleFrequency)
)
.Timestamp()
.TakeUntil(frequencyChanged)
).Repeat()
.Subscribe(x => .WriteLine("testing:" + x.Value.EventArgs.str));
// To change:
// sampleFrequency = TimeSpan.FromSeconds(5);
// frequencyChanged.OnNext(new Unit());
Upvotes: 6
Reputation: 1500055
I don't know of a way of changing the existing sampling interval, but what you could do is sample at the highest frequency you'll need, and then filter with a Where
clause which uses a variable you can change.
For example:
static IObservable<T> SampleEvery<T>(this IObservable<T> source,
Func<int> multipleProvider)
{
int counter = 0;
Func<T, bool> predicate = ignored => {
counter++;
if (counter >= multipleProvider())
{
counter = 0;
}
return counter == 0;
};
return source.Where(predicate);
}
You'd then call it like this:
// Keep this somewhere you can change it
int multiple = 1;
eventAsObservable.Sample(TimeSpan.FromSeconds(1))
.SampleEvery(() => multiple)
.Timestamp()
.Subscribe(x => Console.WriteLine("testing:" +
x.Value.EventArgs.str));
Now, changing the value of multiple
will change the effective sampling frequency.
It's a pretty ugly hack, but I think it should work.
Upvotes: 8
Reputation: 74654
Why don't you just subscribe twice?
Observable.Merge(
eventAsObservable.Sample(TimeSpan.FromSeconds(1)).Timestamp().SelectMany(x => doLocalLookup(x)),
eventAsObservable.Sample(TimeSpan.FromSeconds(10)).Timestamp().SelectMany(x => doRemoteLookup(x)),
).Subscribe(Console.WriteLine);
Or if searches are only active based on some sort of prefix or qualifier like Google Chrome's '?' operator:
Observable.Merge(
eventAsObservable.Sample(TimeSpan.FromSeconds(1)).Where(x => isLocal(x)).SelectMany(x => doLocalLookup(x)),
eventAsObservable.Sample(TimeSpan.FromSeconds(10)).Where(x => isARemoteQuery(x).SelectMany(x => doRemoteLookup(x)),
).Subscribe(Console.WriteLine);
Upvotes: 0