Reputation:
I'm attempting to insert a 1s delay prior to the start of an Observable
sequence, but the first record is emitted immediately upon calling Subscribe
. In the example below, I attempt to use RX's Take
operator by passing a 1s TimeSpan
and expect to receive the first string in the array delayed when I call Subscribe
...
public static void Main(string[] args)
{
IEnumerable<string> e = new[] { "Hi", "There", "Bye" };
IObservable<string> strings = e.ToObservable();
IObservable<string> stringsTimed = strings.Take(TimeSpan.FromMilliseconds(1000));
stringsTimed.Trace("string");
Console.ReadLine();
}
public static IDisposable Trace<T>(this IObservable<T> source, string name)
{
return source.Subscribe
(
onNext: t => Console.WriteLine($"{name} -> {t}"),
onError: ex => Console.WriteLine($"{name} ERROR: {ex.Message}"),
onCompleted: () => Console.WriteLine($"{name} END")
);
}
but when I run the program, the result of "Hi", "There", "Bye" prints on the screen immediately, without any delay, so how can I add 1 sec delay before receiving the first element "Hi"?
Upvotes: 2
Views: 298
Reputation:
Take does not delay the subscription. The TimeSpan
overload of Take
sets the duration the observable will observe/take records before continuing.
Try the code below. If you want to use the extension method from your post (as opposed to the RX Subscribe
extension method below) - put a semicolon after DelaySubscription
...
string name = "Sample";
IEnumerable<string> e = new[] { "Hi", "There", "Bye" };
e.ToObservable()
.DelaySubscription(TimeSpan.FromMilliseconds(1000))
.Subscribe( onNext: t => Console.WriteLine($"{name} -> {t}"),
onError: ex => Console.WriteLine($"{name} ERROR: {ex.Message}"),
onCompleted: () => Console.WriteLine($"{name} END"));
Upvotes: 2
Reputation: 141765
You use Delay
on the source:
var strings = e.ToObservable().Delay(TimeSpan.FromMilliseconds(1000));
If you want to simulate messages incoming once per second for testing purposes you can do something like the following:
string[] e = { "Hi", "There", "Bye" };
var strings = Observable.Generate(
0,
i => i < e.Length,
i => i + 1,
i => e[i],
i => TimeSpan.FromSeconds(1)
);
Trace(strings, "timed");
Thread.Sleep(3000);
static IDisposable Trace<T>(IObservable<T> source, string name)
{
var sw = Stopwatch.StartNew();
return source.Subscribe
(
onNext: t => Console.WriteLine($"{sw.ElapsedMilliseconds}: {name} -> {t}"),
onError: ex => Console.WriteLine($"{name} ERROR: {ex.Message}"),
onCompleted: () =>
{
sw.Stop();
Console.WriteLine($"{sw.ElapsedMilliseconds}: {name} END");
});
}
Upvotes: 1