Reputation: 1593
There is a code:
someObservable.Select(x => getY(x));
Y getY(X x)
{
if (x.Value == X.ABC)
return new Y(1);
else
return new Y(2);
}
On some condition I need to double check x.Value after some period of time. The simplest and bad solution is to use Thread.Sleep:
Y getY(X x)
{
if (x.Value == X.ABC)
return new Y(1);
else
if (x.SomethingElse == true)
{
Thread.Sleep(timeout);
if (x.Value == X.ABC)
return new Y(1);
else
return new Y(2);
}
}
What is the correct code here? I need events to be ordered the same way I receive. It means if I have a delay and I get a new value it has to wait to be processed.
Upvotes: 1
Views: 519
Reputation: 12667
If this is a general problem, I suggest you implement a custom operator. The delay method has an overload which accepts another observable to control the delay timing. In the example, even numbers push immediately while odd numbers are delayed;
Observable.Interval(TimeSpan.FromSeconds(1))
.Delay(i => (i % 2 == 0) ? Observable.Return(0L) : Observable.Timer(TimeSpan.FromSeconds(0.9)))
.Subscribe(Console.WriteLine);
EDIT: Here is a simpler, order preserving operator for delay:
static IObservable<T> DelayOrdered<T>(this IObservable<T> observable, Func<T, TimeSpan> delaySelector, IScheduler scheduler = default(IScheduler))
{
scheduler = scheduler ?? DefaultScheduler.Instance;
return Observable.Create<T>(observer =>
{
var now = scheduler.Now;
return observable
.Subscribe(value =>
{
now = now.Add(delaySelector(value));
scheduler.Schedule(now, () => observer.OnNext(value));
});
});
}
Usage:
Observable.Interval(TimeSpan.FromSeconds(0.2))
.DelayOrdered(i => (i % 2 == 0) ? TimeSpan.Zero : TimeSpan.FromSeconds(1))
.Subscribe(Console.WriteLine);
Order will be preserved because the absolute scheduled time can only increase.
Upvotes: 1
Reputation: 1593
The solution (from https://rsdn.org/forum/dotnet/6629370.1) is to return IObservable rather than Y in getY and make Observable.Delay with Concat.
IObservable<Y> getY(X x)
{
if (x.Value == X.ABC)
return Observable.Return(new Y(1));
else
if (x.SomethingElse == true)
{
return Observable.Delay(Observable.Return(x), timeout).Select(xx => xx.Value == X.ABC ? new Y(1) : new Y(2));
}
}
or
IObservable<Y> getY(X x)
{
return Observable.Create<Y>(async (obs, token) =>
{
if (x.Value == X.ABC)
obs.OnNext(new Y(1));
else
if (x.SomethingElse == true)
{
await Task.Delay(timeout, token);
if (x.Value == X.ABC)
obs.OnNext(new Y(1));
else
obs.OnNext(new Y(2));
}
}
}
And then: someObservable.Select(x => getY(x)).Concat();
Upvotes: 2