Bercovici Adrian
Bercovici Adrian

Reputation: 9360

Observer OnError delegate is not called multiple times

Hello i am trying to understand why the OnError does not get called everytime by the Observer. When trying to debug the OnError called inside my IOBservable implementation , the debugger passes by.

Observable implementation

public class Reactive:IStorage,IObservable<SquareResult>
        {
            private object @lock = new object();
            private List<IObserver<SquareResult>> observers = new List<IObserver<SquareResult>>();


            public static Reactive Create()
            {
                return new Reactive();
            }

            public void Enqueue(SquareResult square)
            {
                lock (@lock)
                {
                    foreach (var item in observers)
                    {
                        if (square.Result < 0)
                        {
                            item.OnError(new InvalidSquareException());
                        }
                        else
                        item.OnNext(square);
                    }
                }
            }
            public void EndStoring()
            {
                this.observers.ForEach(obs => obs.OnCompleted());
            }

            public IDisposable Subscribe(IObserver<SquareResult> observer)
            {
                if (!this.observers.Contains(observer))
                {
                    this.observers.Add(observer);
                }
                return new Unsubscriber(observer, this.observers);
            }

            public class Unsubscriber : IDisposable
            {
                private List<IObserver<SquareResult>> observers;
                private IObserver<SquareResult> observer;

                public Unsubscriber(IObserver<SquareResult>obs,List<IObserver<SquareResult>>observers)
                {
                    this.observer = obs;
                    this.observers = observers;
                }

                public void Dispose()
                {
                    if (this.observer == null)
                    {
                        return;
                    }
                    this.observers.Remove(this.observer);
                }
            }
        }

The only method that is called by the producer is Enqueue.

Then we have a service in which the consumer takes data from the IObservable and sends it over the socket:

Consumer

class ProgressService
    {
        private IStorage observable;
        public ProgressService(IStorage storage)
        {
            this.observable = storage;
        }

        public async Task NotifyAsync(WebSocket socket)
        {

            byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
            CancellationTokenSource cancelSignal = new CancellationTokenSource();
            this.observable.Subscribe(async (next) =>
            {

                try
                {
                    ReadOnlyMemory<byte> data = next.Encode();
                    await socket.SendAsync(data, WebSocketMessageType.Text, true, CancellationToken.None);
                }
                catch (Exception)
                {
                    if (socket.State == WebSocketState.Open)
                    {

                        await socket.CloseAsync(WebSocketCloseStatus.InternalServerError, "Threw on data receive", CancellationToken.None);
                    }
                    return;
                }
            }, async(ex) =>
            {
                 //!! this delegate does not get called everytime the observable calls OnError(exception)
                await socket.SendAsync($"Exception :{ex.Message}".Encode().ToArray(), WebSocketMessageType.Text, true, CancellationToken.None);
            }, async () =>
            {
                await socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Finished test", CancellationToken.None);
            }, cancelSignal.Token);

            await socket.ReceiveAsync(buffer, CancellationToken.None);
            cancelSignal.Cancel();
        }


    }

So what basically happens is if i call from somewhere myobservable.OnError(new exception()) a couple of times, the callback of the consumer is called only once and i do not understand why.

Example:

observable.OnError(new Exception()); observable.OnError(new Exception());

Observer.OnError implmenetation

public void OnError(Exception ex)
{
  Console.WriteLine(ex.Message);
 }

The above example in my code would print the exception message just once.

Upvotes: 0

Views: 157

Answers (1)

Shlomo
Shlomo

Reputation: 14350

The 'Observable contract' allows 0 or more OnNext notifications, followed by one OnCompleted or one OnError notification, which terminate the observable. Once the observable is terminated (by the first OnError in your case), no more notifications are emitted, meaning no more delegates are called.

Upvotes: 2

Related Questions