Reputation: 715
How can I handle the exception thrown while unsubscribing from message handler
var rawSource = Observable.FromEvent<EMSMessageHandler, EMSMessageEventArgs>(
handler => ((sender, e) => handler(e)),
a => this._topicSubscribers.ForEach( s => s.MessageHandler += a ),
a => this._topicSubscribers.ForEach( s => s.MessageHandler -= a));
return rawSource;
In this code sometimes I am getting exception thrown from MessageHandler as "Illegalstateexception : {"Consumer is closed"}"
Upvotes: 0
Views: 376
Reputation: 29786
This use of FromEvent
like this is asking for trouble for all the reasons Dave cited around serialization being required in Rx.
However, assuming events aren't raised concurrently within each event source (and I believe this is the case with an EMS MessageConsumer), I would just do your aggregation after FromEvent
instead of within it, and let Rx do the heavy lifting:
var sources = new List<IObservable<EMSMessageEventArgs>();
foreach(var topicSubscriber in this._topicSubscribers.ToList())
{
var source = Observable.FromEvent<EMSMessageHandler, EMSMessageEventArgs>(
handler => ((sender, e) => handler(e)),
h => topicSubscriber.MessageHandler += h,
h => topicSubscriber.MessageHandler -= h)
.Synchronize();
}
rawSource = sources.Merge();
This way the Merge
will take care of correctly aggregrating and serializing the individual sources - however, there could still be concurrency within the individual events. I actually don't think FromEvent
is stressed by events being raised concurrently within an individual source. However, Merge
may not be so tolerant, in which case the use of Sychronize()
above ensures serialization at the individual event source level as well as across the event sources.
Upvotes: 3
Reputation: 2652
Events typically don't throw, so it's possibly the wrong behavior at the source. If you can fix it at the source, then do so.
Otherwise, you'll either have to catch and swallow the error:
a => this._topicSubscribers.ForEach(s =>
{
try
{
s.MessageHandler += a;
}
catch
{
}
})
which perhaps isn't ideal, or just don't use the FromEvent
method:
return Observable.Create<EventPattern<EMSMessageEventArgs>>(observer =>
{
EMSMessageHandler handler = (sender, e) =>
observer.OnNext(new EventPattern<EMSMessageEventArgs>(sender, e)));
try
{
_topicSubscribers.ForEach(s => s.MessageHandler += handler);
}
catch (Exception ex)
{
try
{
_topicSubscribers.ForEach(s => s.MessageHandler -= handler);
}
catch { }
observer.OnError(ex);
}
return Disposable.Create(() =>
{
try
{
_topicSubscribers.ForEach(s => s.MessageHandler -= handler);
}
catch { }
});
});
Note that Rx requires serialized notifications (§4.2 in the Rx Design Guidelines) so you must ensure that all of the _topicSubscribers
raise events sequentially, never concurrently. If you cannot, then you must synchronize all calls to observer.OnNext
yourself, probably by acquiring a lock.
Update: To be clear, serialization is required regardless of whether you use FromEvent
or Create
, so even if you choose to simply swallow exceptions like my first example, you'll still need to make sure that the source never raises events concurrently; if you cannot, then you're forced to use my Create
example with a lock anyway. FromEvent
doesn't do this for you.
Upvotes: 3