mark
mark

Reputation: 62764

How to hold an exception in an IObservable pipeline and re-throw it at the end?

I have the following method:

public IObservable<DataManagementWorkItem> GetWorkItemSource(int maxConcurrentCalls)
{
    return m_namespaceManager
        .GetNamespaceConnectionInfoSource(true, drainAndDisable: false)
        .Select(nci => Observable.Defer(() => GetPolicySourceForNamespace(nci)))
        .Merge(maxConcurrentCalls)
        .Where(IsValid)
        .Select(ToWorkItem)
        .Where(o => o != null);
}

It implements the following logic:

  1. Enter the monad by obtaining IObservable<NamespaceConnectionInfo> from the namespace manager (GetNamespaceConnectionInfoSource).
  2. As namespaces become available obtain IObservable<DataManagementPolicy> corresponding to the particular namespace (GetPolicySourceForNamespace). However, use the Merge operator to restrict the number of concurrent calls to GetPolicySourceForNamespace.
  3. Filter out bad DataManagementPolicy records (cannot be done in SQL).
  4. Translate the seemingly good DataManagementPolicy records to DataManagementWorkItem instances. Some could turn out as null, so they are filtered out at the end.

The GetNamespaceConnectionInfoSource can fault after having produced certain amount of valid NamespaceConnectionInfo objects. It is entirely possible that certain amount of DataManagementWorkItem objects have already been produced by that time in the final observable sequence.

I have a unit test, where:

I am also interested to examine the items produced in the final observable before it is faulted:

var dm = DependencyResolver.Instance.GetInstance<IDataManagement>();
var workItems = new List<DataManagementWorkItem>();
try
{
    var obs = dm.GetWorkItemSource(10);
    obs.Subscribe(wi => workItems.Add(wi));
    await obs;
    Assert.Fail("An expected exception was not thrown");
}
catch (Exception exc)
{
    AssertTheRightException(exc);
}

The workItems collection has a different number of items every time. One run it has 69 items, another - 50, yet another - 18.

My interpretation is that when the fault occurs there are good NamespaceConnectionInfo and DataManagementPolicy objects in various phases of processing, all of which get aborted because of the fault. The amount is different each time, because the items are produced asynchronously.

And here lies my problem - I do not want them to be aborted. I want them to run to completion, be produced in the final observable sequence and only then to communicate the fault. In essence I want to hold the exception and re-throw it at the end.

I tried to modify the implementation a little bit:

public IObservable<DataManagementWorkItem> GetWorkItemSource(int maxConcurrentCalls)
{
    Exception fault = null;
    return m_namespaceManager
        .GetNamespaceConnectionInfoSource(true, drainAndDisable: false)
        .Catch<NamespaceConnectionInfo, Exception>(exc =>
        {
            fault = exc;
            return Observable.Empty<NamespaceConnectionInfo>();
        })
        .Select(nci => Observable.Defer(() => GetPolicySourceForNamespace(nci)))
        .Merge(maxConcurrentCalls)
        .Where(IsValid)
        .Select(ToWorkItem)
        .Where(o => o != null)
        .Finally(() =>
        {
            if (fault != null)
            {
                throw fault;
            }
        });
}

Needless to say - it did not work. Finally does not seem to propagate any exceptions, which I actually agree with.

So, what is the right way to achieve what I want?

EDIT

Unrelated to the question, I have found that the test code I use to collect the produced DataManagementWorkItem instances is bad. Instead of

    var obs = dm.GetWorkItemSource(10);
    obs.Subscribe(wi => workItems.Add(wi));
    await obs;

it should be

    await dm.GetWorkItemSource(1).Do(wi => workItems.Add(wi));

The difference is that the latter subscribes to the source of items just once, whereas the original version subscribed twice:

  1. bySubscribe
  2. by await

It does not affect the qustion, but screws my mocking code.

Clarification

This more of a clarification. Each namespace produce a sequence of 10 policy objects. But this process is asynchronous - the policy objects are produced sequentially, but asynchronously. During all that time namespaces continue to be produced and hence given 25 namespaces before the fault there are three possible "states" in which a produced namespace can be:

When an error in the namespace production occurs the entire pipeline is aborted, regardless of the "state" in which "good" namespaces are right now.

Let us have a look at the following trivial example:

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;

namespace observables
{
    class Program
    {
        static void Main()
        {
            int count = 0;
            var obs = Observable
                .Interval(TimeSpan.FromMilliseconds(1))
                .Take(50)
                .Select(i =>
                {
                    if (25 == Interlocked.Increment(ref count))
                    {
                        throw new Exception("Boom!");
                    }
                    return i;
                })
                .Select(i => Observable.Defer(() => Observable.Interval(TimeSpan.FromMilliseconds(1)).Take(10).Select(j => i * 1000 + j)))
                .Merge(10);

            var items = new HashSet<long>();
            try
            {
                obs.Do(i => items.Add(i)).GetAwaiter().GetResult();
            }
            catch (Exception exc)
            {
                Debug.WriteLine(exc.Message);
            }
            Debug.WriteLine(items.Count);
        }
    }
}

When I run it I usually have the following output:

Boom!
192

But, it could also display 191. However, if we apply the fault concat solution (even if it does not work when there are no faults):

        int count = 0;
        var fault = new Subject<long>();
        var obs = Observable
            .Interval(TimeSpan.FromMilliseconds(1))
            .Take(50)
            .Select(i =>
            {
                if (25 == Interlocked.Increment(ref count))
                {
                    throw new Exception("Boom!");
                }
                return i;
            })
            .Catch<long, Exception>(exc =>
            {
                fault.OnError(exc);
                return Observable.Empty<long>();
            })
            .Select(i => Observable.Defer(() => Observable.Interval(TimeSpan.FromMilliseconds(1)).Take(10).Select(j => i * 1000 + j)))
            .Merge(10)
            .Concat(fault);

Then the output is consistently 240, because we let all the asynchronous processes that have already been started to complete.

An awkward solution based on answer by pmccloghrylaing

    public IObservable<DataManagementWorkItem> GetWorkItemSource(int maxConcurrentCalls)
    {
        var fault = new Subject<DataManagementWorkItem>();
        bool faulted = false;
        return m_namespaceManager
            .GetNamespaceConnectionInfoSource(true, drainAndDisable: false)
            .Catch<NamespaceConnectionInfo, Exception>(exc =>
            {
                faulted = true;
                return Observable.Throw<NamespaceConnectionInfo>(exc);
            })
            .Finally(() =>
            {
                if (!faulted)
                {
                    fault.OnCompleted();
                }
            })
            .Catch<NamespaceConnectionInfo, Exception>(exc =>
            {
                fault.OnError(exc);
                return Observable.Empty<NamespaceConnectionInfo>();
            })
            .Select(nci => Observable.Defer(() => GetPolicySourceForNamespace(nci)))
            .Merge(maxConcurrentCalls)
            .Where(IsValid)
            .Select(ToWorkItem)
            .Where(o => o != null)
            .Concat(fault);
    }

It works both when the namespace production faults and both when it is successful, but it looks so awkward. Plus multiple subscriptions still share the fault. There must be a more elegant solution.

GetNamespaceConnectionInfoSource source code

public IObservable<NamespaceConnectionInfo> GetNamespaceConnectionInfoSource(bool? isActive = null,
    bool? isWorkflowEnabled = null, bool? isScheduleEnabled = null, bool? drainAndDisable = null,
    IEnumerable<string> nsList = null, string @where = null, IList<SqlParameter> whereParameters = null)
{
    IList<SqlParameter> parameters;
    var sql = GetNamespaceConnectionInfoSqls.GetSql(isActive,
        isWorkflowEnabled, isScheduleEnabled, drainAndDisable, nsList, @where, whereParameters, out parameters);
    var sqlUtil = m_sqlUtilProvider.Get(m_siteSettings.ControlDatabaseConnString);
    return sqlUtil.GetSource(typeof(NamespaceConnectionInfo), sqlUtil.GetReaderAsync(sql, parameters)).Cast<NamespaceConnectionInfo>();
}

public IObservable<DbDataReader> GetReaderAsync(string query, IList<SqlParameter> parameters = null, CommandBehavior commandBehavior = CommandBehavior.Default)
{
    return Observable.FromAsync(async () =>
    {
        SqlCommand command = null;
        try
        {
            var conn = await GetConnectionAsync();
            command = GetCommand(conn, query, parameters);
            return (DbDataReader)await command.ExecuteReaderAsync(commandBehavior | CommandBehavior.CloseConnection);
        }
        finally
        {
            DisposeSilently(command);
        }
    });
}

public IObservable<object> GetSource(Type objectType, IObservable<DbDataReader> readerTask)
{
    return Observable.Create<object>(async (obs, ct) => await PopulateSource(objectType, await readerTask, true, obs, ct));
}

private static async Task PopulateSource(Type objectType, DbDataReader reader, bool disposeReader, IObserver<object> obs, CancellationToken ct)
{
    try
    {
        if (IsPrimitiveDataType(objectType))
        {
            while (await reader.ReadAsync(ct))
            {
                obs.OnNext(reader[0]);
            }
        }
        else
        {
            // Get all the properties in our Object
            var typeReflector = objectType.GetTypeReflector(TypeReflectorCreationStrategy.PREPARE_DATA_RECORD_CONSTRUCTOR);

            // For each property get the data from the reader to the object
            while (await reader.ReadAsync(ct))
            {
                obs.OnNext(typeReflector.DataRecordConstructor == null ?
                    ReadNextObject(typeReflector, reader) :
                    typeReflector.DataRecordConstructor(reader));
            }
        }
    }
    catch (OperationCanceledException)
    {
    }
    finally
    {
        if (disposeReader)
        {
            reader.Dispose();
        }
    }
}

Upvotes: 2

Views: 2531

Answers (3)

Brandon
Brandon

Reputation: 39192

Yeah the basic problem is the Merge has a fail fast implementation. If the source observable produces an error, or any of the inner observables produce an error, then Merge fails the stream without waiting for the remaining inner observables to finish.

To achieve what you want, you need to "catch" the error before merge sees it, and "rethrow" it after the inner observables have finished:

public IObservable<DataManagementWorkItem> GetWorkItemSource(int maxConcurrentCalls)
{
    // wrap within Observable.Defer
    // so that each new subscription
    // gets its own Error subject
    return Observable.Defer(() =>
    {
        var error = new ReplaySubject<DataManagementWorkItem>(1);

        return m_namespaceManager
            .GetNamespaceConnectionInfoSource(true, drainAndDisable: false)
            .Catch(err =>
            {
                error.OnError(err);
                return Observable.Empty<NamespaceConnectionInfo>();
            })
            .Finally(error.OnCompleted)
            .Select(nci => Observable.Defer(() => GetPolicySourceForNamespace(nci)))
            .Merge(maxConcurrentCalls)
            .Where(IsValid)
            .Select(ToWorkItem)
            .Where(o => o != null)
            .Concat(error);
    });
}

Also, I note your unit test is subscribing twice to the returned observable, which is adding to your confusion. Once with a call to Subscribe to populate your list, and again with await. You really only want to subscribe once. We can use the .Do operator to populate your list and you should be able to inspect it within your error handler:

var dm = DependencyResolver.Instance.GetInstance<IDataManagement>();
var workItems = new List<DataManagementWorkItem>();
try
{
    var obs = dm.GetWorkItemSource(10).Do(workItems.Add);
    await obs;
    Assert.Fail("An expected exception was not thrown");
}
catch (Exception exc)
{
    AssertTheRightException(exc);
    // workItems should be populated.
}

Upvotes: 0

Enigmativity
Enigmativity

Reputation: 117064

The call to m_namespaceManager.GetNamespaceConnectionInfoSource(true, drainAndDisable: false) returns an IObservable<NamespaceConnectionInfo>. Now, the contract for any single observable is this:

OnNext*(OnError|OnCompleted)

This means that you get zero or more values followed by one, and only one, of either an error or a completion.

You cannot get multiple errors from a single observable and you cannot get values after you have gotten an error.

If your observable does return more than one error it is breaking the normal Rx contract.

So, given this, it is impossible for you, given the existing code, to delay errors until the end of the observable because an error is the end of the observable.

What you can do is change the way that you produce your values in GetNamespaceConnectionInfoSource so that it generates multiple sequences calls .Materialize() before merging them back as one. This means that you would have a single IObservable<Notification<NamespaceConnectionInfo>> and that can have multiple errors and completions throughout the stream. You can then group this stream and handle the values before you handle the errors. But it all hinges on a change to GetNamespaceConnectionInfoSource, and since you haven't posted the source for this I cannot give you the right code.

To help understand this, look at this code:

var xs = new [] { 1, 2, 3, 0, 4, 0, 5 }.ToObservable();

xs
    .Select(x =>
    {
        if (x == 0)
            throw new NotSupportedException();
        else
            return x;
    })
    .Subscribe(
        x => Console.WriteLine(x),
        ex => Console.WriteLine(ex.ToString()));

It produces this:

1
2
3
System.NotSupportedException: Specified method is not supported.
   at UserQuery.<Main>b__0(Int32 x) in query_ioaahp.cs:line 45
   at System.Reactive.Linq.ObservableImpl.Select`2._.OnNext(TSource value)

The values 4 & 5 simply do not get produced.

Now look at this code:

xs
    .Select(x =>
        Observable
            .Start(() =>
            {
                if (x == 0)
                    throw new NotSupportedException();
                else
                    return x;
            })
            .Materialize())
    .Merge()
    .Where(x => x.Kind != NotificationKind.OnCompleted)
    .Subscribe(
        x => Console.WriteLine(String.Format(
            "{0} {1}",
            x.Kind,
            x.HasValue ? x.Value.ToString() : "")),
        ex => Console.WriteLine(ex.ToString()));

This produces the following:

OnNext 1
OnNext 4
OnError 
OnError 
OnNext 5
OnNext 3
OnNext 2

It's out of order because of the introduced parallelism.

But now you can handle all of the errors.

Upvotes: 1

pmccloghrylaing
pmccloghrylaing

Reputation: 1129

Would Concat solve your problem? I've wrapped it in Observable.Create with Finally to complete the faults subject.

public IObservable<DataManagementWorkItem> GetWorkItemSource(int maxConcurrentCalls)
{
    return Observable.Create<DataManagementWorkItem>((observer) =>
    {
        var faults = new Subject<DataManagementWorkItem>();
        return m_namespaceManager
            .GetNamespaceConnectionInfoSource(true, drainAndDisable: false)
            .Catch<NamespaceConnectionInfo, Exception>(exc =>
            {
                faults.OnError(exc);
                return Observable.Empty<NamespaceConnectionInfo>();
            })
            .Take(maxConcurrentCalls)
            .Select(nci => GetPolicySourceForNamespace(nci))
            .Merge()
            .Where(IsValid)
            .Select(ToWorkItem)
            .Where(o => o != null)
            .Finally(() => faults.OnCompleted())
            .Concat(faults)
            .Subscribe(observer);
    });
}

Also, does this return what you expect? (24 in your test)

m_namespaceManager
    .GetNamespaceConnectionInfoSource(true, drainAndDisable: false)
    .Catch<NamespaceConnectionInfo, Exception>(exc =>
    {
        faults.OnError(exc);
        return Observable.Empty<NamespaceConnectionInfo>();
    })
    .Count()

Upvotes: 0

Related Questions