vkg
vkg

Reputation: 715

Exception Handling in RX FromEvent<T> method

How can I handle the exception thrown while unsubscribing from message handler

var rawSource = Observable.FromEvent<EMSMessageHandler, EMSMessageEventArgs>(
            handler => ((sender, e) => handler(e)),
            a => this._topicSubscribers.ForEach( s => s.MessageHandler += a ),
            a => this._topicSubscribers.ForEach( s => s.MessageHandler -= a));
        return rawSource;

In this code sometimes I am getting exception thrown from MessageHandler as "Illegalstateexception : {"Consumer is closed"}"

Upvotes: 0

Views: 376

Answers (2)

James World
James World

Reputation: 29786

This use of FromEvent like this is asking for trouble for all the reasons Dave cited around serialization being required in Rx.

However, assuming events aren't raised concurrently within each event source (and I believe this is the case with an EMS MessageConsumer), I would just do your aggregation after FromEvent instead of within it, and let Rx do the heavy lifting:

var sources = new List<IObservable<EMSMessageEventArgs>();     

foreach(var topicSubscriber in this._topicSubscribers.ToList())
{
    var source = Observable.FromEvent<EMSMessageHandler, EMSMessageEventArgs>(
        handler => ((sender, e) => handler(e)),
        h => topicSubscriber.MessageHandler += h,
        h => topicSubscriber.MessageHandler -= h)
        .Synchronize();
}

rawSource = sources.Merge();

This way the Merge will take care of correctly aggregrating and serializing the individual sources - however, there could still be concurrency within the individual events. I actually don't think FromEvent is stressed by events being raised concurrently within an individual source. However, Merge may not be so tolerant, in which case the use of Sychronize() above ensures serialization at the individual event source level as well as across the event sources.

Upvotes: 3

Dave Sexton
Dave Sexton

Reputation: 2652

Events typically don't throw, so it's possibly the wrong behavior at the source. If you can fix it at the source, then do so.

Otherwise, you'll either have to catch and swallow the error:

a => this._topicSubscribers.ForEach(s => 
{
  try
  {
    s.MessageHandler += a;
  }
  catch
  {
  }
})

which perhaps isn't ideal, or just don't use the FromEvent method:

return Observable.Create<EventPattern<EMSMessageEventArgs>>(observer =>
{
  EMSMessageHandler handler = (sender, e) => 
    observer.OnNext(new EventPattern<EMSMessageEventArgs>(sender, e)));

  try
  {
    _topicSubscribers.ForEach(s => s.MessageHandler += handler);
  }
  catch (Exception ex)
  {
    try
    {
      _topicSubscribers.ForEach(s => s.MessageHandler -= handler);
    }
    catch { }

    observer.OnError(ex);
  }

  return Disposable.Create(() =>
  {
    try
    {
      _topicSubscribers.ForEach(s => s.MessageHandler -= handler);
    }
    catch { }
  });
});

Note that Rx requires serialized notifications (§4.2 in the Rx Design Guidelines) so you must ensure that all of the _topicSubscribers raise events sequentially, never concurrently. If you cannot, then you must synchronize all calls to observer.OnNext yourself, probably by acquiring a lock.

Update: To be clear, serialization is required regardless of whether you use FromEvent or Create, so even if you choose to simply swallow exceptions like my first example, you'll still need to make sure that the source never raises events concurrently; if you cannot, then you're forced to use my Create example with a lock anyway. FromEvent doesn't do this for you.

Upvotes: 3

Related Questions