Reputation: 1091
I am trying to use Observable.Interval combined with select to poll a service, it seems like a syntactically nice way to do it. However whenever I try to implement a way to wait for the observable to complete I get strange behaviour where the service invoked inside the select is invoked multiple times.
With no waiting I get the correct behaviour I am looking for...
Code
private static ConcurrentQueue<string> _data = new ConcurrentQueue<string>(new [] { "a", "b", "c", "d" });
static void Main(string[] args)
{
var observable = Observable
.Interval(TimeSpan.FromSeconds(2))
.Select(Transform)
.TakeWhile(x => x != null);
Console.WriteLine("Starting subcription");
var disposable = observable.Subscribe(x => Console.WriteLine("Event raised for {0}", x));
Console.WriteLine("Waiting for subcription to complete");
// need to wait here
Console.WriteLine("Press any key to exit. . .");
Console.ReadKey();
}
private static string Transform(long x)
{
string result;
_data.TryDequeue(out result);
Console.WriteLine("Transform invoked [x: {0}, Result: {1}]", x, result ?? "NULL");
return result;
}
Output
Starting subcription
Waiting for subcription to complete
Press any key to exit. . .
Transform invoked [x: 0, Result: a]
Event raised for a
Transform invoked [x: 1, Result: b]
Event raised for b
Transform invoked [x: 2, Result: c]
Event raised for c
Transform invoked [x: 3, Result: d]
Event raised for d
Transform invoked [x: 4, Result: NULL]
If I invoke the extension method wait on the observable it seems to cause Transform to be invoked twice per interval and only one of the values is returned to the event...
Code
private static ConcurrentQueue<string> _data = new ConcurrentQueue<string>(new [] { "a", "b", "c", "d" });
static void Main(string[] args)
{
var observable = Observable
.Interval(TimeSpan.FromSeconds(2))
.Select(Transform)
.TakeWhile(x => x != null);
Console.WriteLine("Starting subcription");
var disposable = observable.Subscribe(x => Console.WriteLine("Event raised for {0}", x));
Console.WriteLine("Waiting for subcription to complete");
observable.Wait();
Console.WriteLine("Press any key to exit. . .");
Console.ReadKey();
}
private static string Transform(long x)
{
string result;
_data.TryDequeue(out result);
Console.WriteLine("Transform invoked [x: {0}, Result: {1}]", x, result ?? "NULL");
return result;
}
Output
Starting subcription
Waiting for subcription to complete
Transform invoked [x: 0, Result: a]
Event raised for a
Transform invoked [x: 0, Result: b]
Transform invoked [x: 1, Result: c]
Event raised for c
Transform invoked [x: 1, Result: d]
Transform invoked [x: 2, Result: NULL]
Transform invoked [x: 2, Result: NULL]
Press any key to exit. . .
I suspect this is because the Wait is creating a second subscription behind the scenes and the service behind my observable is stateful.
I've seen people recommend using ToTask to wait for an observable to complete, this has the same strange behaviour.
So what is the correct way to have a stateful service polled behind an observable whilst all subscribers receiving the same set of data?
Upvotes: 1
Views: 95
Reputation: 14350
A couple things:
Subscribe
, not other operators like Select
. I'm not sure that's practical for your example though.Select
operator, etc.. You have two subscriptions, one from Subscribe
the other from Wait
, so you'll have two timers, with two Select
operators attached calling Transform
. You can fix this in one of two ways:
Your observable as a hot observable would look like this:
var observable = Observable
.Interval(TimeSpan.FromSeconds(2))
.Select(Transform)
.TakeWhile(x => x != null)
.Publish()
.RefCount();
My suggestion would be that since you're changing state in your observable, I would make it hot to ensure that you didn't end up running that twice.
Upvotes: 2
Reputation: 1011
Make sure you only subscribe once to the observable. Omit the first call to Subscribe
and just leave the call to Wait
. If you still want to emit some log messages (as you've done in your subscription), add a Do
-step:
private static ConcurrentQueue<string> _data = new ConcurrentQueue<string>(new [] { "a", "b", "c", "d" });
static void Main(string[] args)
{
var observable = Observable
.Interval(TimeSpan.FromSeconds(2))
.Select(Transform)
.TakeWhile(x => x != null);
Console.WriteLine("Wait for the observable to complete.");
observable
.Do(x => Console.WriteLine("Event raised for {0}", x))
.Wait();
Console.WriteLine("Press any key to exit. . .");
Console.ReadKey();
}
private static string Transform(long x)
{
string result;
_data.TryDequeue(out result);
Console.WriteLine("Transform invoked [x: {0}, Result: {1}]", x, result ?? "NULL");
return result;
}
Note that Wait
will block (which is inevitable in a Main
method). Also, it will throw when your observable is empty. If you are not interested in any values of the observable, add a LastOrDefault
-step.
Waiting for an observable is an inherently async operation, so you should check out whether it's possible to use ToTask
instead of Wait
and await it in an async method.
Upvotes: 1