Reputation: 31
I have an application running an observable interval with multiple observers. Every 0.5s the interval loads some XML data from a web server, then the observers do some application specific processing on a background thread. Once the data isn't needed anymore, the subscriptions and the interval observable get disposed, so observer's OnNext/OnCompleted/OnError won't be called anymore. So far so good.
My problem: In some rare cases it is possible, that after calling Dispose my observer's OnNext method is still running! Before proceeding to further operations after disposing, I would like to ensure that OnNext has been completed.
My current solution: I've introduced a locker field in my observer class (see code). After disposing I try to acquire a lock and continue only after the lock has been acquired. While this solution works (?), it somehow just feels wrong to me.
Question: Is there a more elegant, more "Rx Way" to solve this problem?
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace RxExperimental
{
internal sealed class MyXmlDataFromWeb
{
public string SomeXmlDataFromWeb { get; set; }
}
internal sealed class MyObserver : IObserver<MyXmlDataFromWeb>
{
private readonly object _locker = new object();
private readonly string _observerName;
public MyObserver(string observerName) {
this._observerName = observerName;
}
public object Locker {
get { return this._locker; }
}
public void OnCompleted() {
lock (this._locker) {
Console.WriteLine("{0}: Completed.", this._observerName);
}
}
public void OnError(Exception error) {
lock (this._locker) {
Console.WriteLine("{0}: An error occured: {1}", this._observerName, error.Message);
}
}
public void OnNext(MyXmlDataFromWeb value) {
lock (this._locker) {
Console.WriteLine(" {0}: OnNext running on thread {1}... ", this._observerName, Thread.CurrentThread.ManagedThreadId);
Console.WriteLine(" {0}: XML received: {1}", this._observerName, value.SomeXmlDataFromWeb);
Thread.Sleep(5000); // simulate some long running operation
Console.WriteLine(" {0}: OnNext running on thread {1}... Done.", this._observerName, Thread.CurrentThread.ManagedThreadId);
}
}
}
internal sealed class Program
{
private static void Main() {
const int interval = 500;
//
var dataSource = Observable.Interval(TimeSpan.FromMilliseconds(interval), NewThreadScheduler.Default).Select(_ => {
var data = new MyXmlDataFromWeb {
SomeXmlDataFromWeb = String.Format("<timestamp>{0:yyyy.MM.dd HH:mm:ss:fff}</timestamp>", DateTime.Now)
};
return data;
}).Publish();
//
var observer1 = new MyObserver("Observer 1");
var observer2 = new MyObserver("Observer 2");
//
var subscription1 = dataSource.ObserveOn(NewThreadScheduler.Default).Subscribe(observer1);
var subscription2 = dataSource.ObserveOn(NewThreadScheduler.Default).Subscribe(observer2);
//
var connection = dataSource.Connect();
//
Console.WriteLine("Press any key to cancel ...");
Console.ReadLine();
//
subscription1.Dispose();
subscription2.Dispose();
connection.Dispose();
//
lock (observer1.Locker) {
Console.WriteLine("Observer 1 completed.");
}
lock (observer2.Locker) {
Console.WriteLine("Observer 2 completed.");
}
//
Console.WriteLine("Can only be executed, after all observers completed.");
}
}
}
Upvotes: 3
Views: 3700
Reputation: 39182
Yes, there's a more Rx-way of doing this.
The first observation is that unsubscribing from an observable stream is essentially independent from what is currently happening within the observer. There's not really any feedback. Since you have the requirement that you know definitively when the observations have ended, you need to model this into your observable stream. In other words, instead of unsubscribing from the stream, you should complete the stream so you will be able to observe the OnComplete
event. In your case, you can use TakeUntil
to end the observable instead of unsubscribing from it.
The second observation is that your main program needs to observe when your "observer" finishes its work. But since you made your "observer" an actual IObservable
, you don't really have a way to do this. This is a common source of confusion I see when people first start using Rx. If you model your "observer" as just another link in the observable chain, then your main program can observer. Specifically, your "observer" is nothing more than a mapping operation (with side effects) that maps incoming Xml data into "done" messages.
So if you refactor your code, you can get what you want...
public class MyObserver
{
private readonly string _name;
public MyObserver(string name) { _name = name; }
public IObservable<Unit> Handle(IObservable<MyXmlDataFromWeb source)
{
return source.Select(value =>
{
Thread.Sleep(5000); // simulate work
return Unit.Default;
});
}
}
// main
var endSignal = new Subject<Unit>();
var dataSource = Observable
.Interval(...)
.Select(...)
.TakeUntil(endSignal)
.Publish();
var observer1 = new MyObserver("Observer 1");
var observer2 = new MyObserver("Observer 2");
var results1 = observer1.Handle(dataSource.ObserveOn(...));
var results2 = observer2.Handle(dataSource.ObserveOn(...));
// since you just want to know when they are all done
// just merge them.
// use ToTask() to subscribe them and collect the results
// as a Task
var processingDone = results1.Merge(results2).Count().ToTask();
dataSource.Connect();
Console.WriteLine("Press any key to cancel ...");
Console.ReadLine();
// end the stream
endSignal.OnNext(Unit.Default);
// wait for the processing to complete.
// use await, or Task.Result
var numProcessed = await processingDone;
Upvotes: 5