user16095727
user16095727

Reputation:

How to get elements from an IObservable with a delay before the start of the subscription?

I'm attempting to insert a 1s delay prior to the start of an Observable sequence, but the first record is emitted immediately upon calling Subscribe. In the example below, I attempt to use RX's Take operator by passing a 1s TimeSpan and expect to receive the first string in the array delayed when I call Subscribe...

public static void Main(string[] args)
{
    IEnumerable<string> e = new[] { "Hi", "There", "Bye" };
    IObservable<string> strings = e.ToObservable();

    IObservable<string> stringsTimed = strings.Take(TimeSpan.FromMilliseconds(1000));
    stringsTimed.Trace("string");

    Console.ReadLine();
}

public static IDisposable Trace<T>(this IObservable<T> source, string name)
{
    return source.Subscribe
    (
        onNext: t => Console.WriteLine($"{name} -> {t}"),
        onError: ex => Console.WriteLine($"{name} ERROR: {ex.Message}"),
        onCompleted: () => Console.WriteLine($"{name} END")
    );
}

but when I run the program, the result of "Hi", "There", "Bye" prints on the screen immediately, without any delay, so how can I add 1 sec delay before receiving the first element "Hi"?

Upvotes: 2

Views: 298

Answers (2)

user3259797
user3259797

Reputation:

Take does not delay the subscription. The TimeSpan overload of Take sets the duration the observable will observe/take records before continuing.

Try the code below. If you want to use the extension method from your post (as opposed to the RX Subscribe extension method below) - put a semicolon after DelaySubscription...

string name = "Sample";
IEnumerable<string> e = new[] { "Hi", "There", "Bye" };

e.ToObservable()
    .DelaySubscription(TimeSpan.FromMilliseconds(1000))
    .Subscribe( onNext: t => Console.WriteLine($"{name} -> {t}"),
                onError: ex => Console.WriteLine($"{name} ERROR: {ex.Message}"),
                onCompleted: () => Console.WriteLine($"{name} END"));

Upvotes: 2

Guru Stron
Guru Stron

Reputation: 141765

You use Delay on the source:

var strings = e.ToObservable().Delay(TimeSpan.FromMilliseconds(1000));

If you want to simulate messages incoming once per second for testing purposes you can do something like the following:

string[] e = { "Hi", "There", "Bye" };
var strings = Observable.Generate(
    0,
    i => i < e.Length,
    i => i + 1,
    i => e[i],
    i => TimeSpan.FromSeconds(1)
);

Trace(strings, "timed");
Thread.Sleep(3000);

static IDisposable Trace<T>(IObservable<T> source, string name)
{
    var sw = Stopwatch.StartNew();
    return source.Subscribe
    (
        onNext: t => Console.WriteLine($"{sw.ElapsedMilliseconds}: {name} -> {t}"),
        onError: ex => Console.WriteLine($"{name} ERROR: {ex.Message}"),
        onCompleted: () =>
        {
            sw.Stop();
            Console.WriteLine($"{sw.ElapsedMilliseconds}: {name} END");
        });
}

Upvotes: 1

Related Questions