NN_
NN_

Reputation: 1593

Delay on condition

There is a code:

someObservable.Select(x => getY(x));    
Y getY(X x)
{
  if (x.Value == X.ABC)
    return new Y(1);
  else
    return new Y(2);
}

On some condition I need to double check x.Value after some period of time. The simplest and bad solution is to use Thread.Sleep:

Y getY(X x)
{
  if (x.Value == X.ABC)
    return new Y(1);
  else
  if (x.SomethingElse == true)
  {
     Thread.Sleep(timeout);
     if (x.Value == X.ABC)
       return new Y(1);
     else
       return new Y(2);
  }
}

What is the correct code here? I need events to be ordered the same way I receive. It means if I have a delay and I get a new value it has to wait to be processed.

Upvotes: 1

Views: 519

Answers (2)

Asti
Asti

Reputation: 12667

If this is a general problem, I suggest you implement a custom operator. The delay method has an overload which accepts another observable to control the delay timing. In the example, even numbers push immediately while odd numbers are delayed;

Observable.Interval(TimeSpan.FromSeconds(1))
          .Delay(i => (i % 2 == 0) ? Observable.Return(0L) : Observable.Timer(TimeSpan.FromSeconds(0.9)))
          .Subscribe(Console.WriteLine);

EDIT: Here is a simpler, order preserving operator for delay:

  static IObservable<T> DelayOrdered<T>(this IObservable<T> observable, Func<T, TimeSpan> delaySelector, IScheduler scheduler = default(IScheduler))
        {
            scheduler = scheduler ?? DefaultScheduler.Instance;
            return Observable.Create<T>(observer =>
            {
                var now = scheduler.Now;

                return observable
                       .Subscribe(value =>
                {
                    now = now.Add(delaySelector(value));
                    scheduler.Schedule(now, () => observer.OnNext(value));
                });
            });
        }

Usage:

  Observable.Interval(TimeSpan.FromSeconds(0.2))
                      .DelayOrdered(i => (i % 2 == 0) ? TimeSpan.Zero : TimeSpan.FromSeconds(1))
                      .Subscribe(Console.WriteLine);

Order will be preserved because the absolute scheduled time can only increase.

Upvotes: 1

NN_
NN_

Reputation: 1593

The solution (from https://rsdn.org/forum/dotnet/6629370.1) is to return IObservable rather than Y in getY and make Observable.Delay with Concat.

IObservable<Y> getY(X x)
{
  if (x.Value == X.ABC)
    return Observable.Return(new Y(1));
  else
  if (x.SomethingElse == true)
  {
     return Observable.Delay(Observable.Return(x), timeout).Select(xx => xx.Value == X.ABC ? new Y(1) : new Y(2));
  }
}

or

IObservable<Y> getY(X x)
{
  return Observable.Create<Y>(async (obs, token) =>
  {
    if (x.Value == X.ABC)
        obs.OnNext(new Y(1));
    else
    if (x.SomethingElse == true)
    {
        await Task.Delay(timeout, token);
        if (x.Value == X.ABC)
           obs.OnNext(new Y(1));
        else
           obs.OnNext(new Y(2));
     }
   }
 }

And then: someObservable.Select(x => getY(x)).Concat();

Upvotes: 2

Related Questions