Reputation: 62764
I have the following method:
public IObservable<DataManagementWorkItem> GetWorkItemSource(int maxConcurrentCalls)
{
return m_namespaceManager
.GetNamespaceConnectionInfoSource(true, drainAndDisable: false)
.Select(nci => Observable.Defer(() => GetPolicySourceForNamespace(nci)))
.Merge(maxConcurrentCalls)
.Where(IsValid)
.Select(ToWorkItem)
.Where(o => o != null);
}
It implements the following logic:
IObservable<NamespaceConnectionInfo>
from the namespace manager (GetNamespaceConnectionInfoSource
).IObservable<DataManagementPolicy>
corresponding to the particular namespace (GetPolicySourceForNamespace
). However, use the Merge
operator to restrict the number of concurrent calls to GetPolicySourceForNamespace
.DataManagementPolicy
records (cannot be done in SQL).DataManagementPolicy
records to DataManagementWorkItem
instances. Some could turn out as null
, so they are filtered out at the end.The GetNamespaceConnectionInfoSource
can fault after having produced certain amount of valid NamespaceConnectionInfo
objects. It is entirely possible that certain amount of DataManagementWorkItem
objects have already been produced by that time in the final observable sequence.
I have a unit test, where:
GetNamespaceConnectionInfoSource
throws after having produced 25 namespacesGetPolicySourceForNamespace
produces 10 objects per namespaceI am also interested to examine the items produced in the final observable before it is faulted:
var dm = DependencyResolver.Instance.GetInstance<IDataManagement>();
var workItems = new List<DataManagementWorkItem>();
try
{
var obs = dm.GetWorkItemSource(10);
obs.Subscribe(wi => workItems.Add(wi));
await obs;
Assert.Fail("An expected exception was not thrown");
}
catch (Exception exc)
{
AssertTheRightException(exc);
}
The workItems
collection has a different number of items every time. One run it has 69 items, another - 50, yet another - 18.
My interpretation is that when the fault occurs there are good NamespaceConnectionInfo
and DataManagementPolicy
objects in various phases of processing, all of which get aborted because of the fault. The amount is different each time, because the items are produced asynchronously.
And here lies my problem - I do not want them to be aborted. I want them to run to completion, be produced in the final observable sequence and only then to communicate the fault. In essence I want to hold the exception and re-throw it at the end.
I tried to modify the implementation a little bit:
public IObservable<DataManagementWorkItem> GetWorkItemSource(int maxConcurrentCalls)
{
Exception fault = null;
return m_namespaceManager
.GetNamespaceConnectionInfoSource(true, drainAndDisable: false)
.Catch<NamespaceConnectionInfo, Exception>(exc =>
{
fault = exc;
return Observable.Empty<NamespaceConnectionInfo>();
})
.Select(nci => Observable.Defer(() => GetPolicySourceForNamespace(nci)))
.Merge(maxConcurrentCalls)
.Where(IsValid)
.Select(ToWorkItem)
.Where(o => o != null)
.Finally(() =>
{
if (fault != null)
{
throw fault;
}
});
}
Needless to say - it did not work. Finally
does not seem to propagate any exceptions, which I actually agree with.
So, what is the right way to achieve what I want?
EDIT
Unrelated to the question, I have found that the test code I use to collect the produced DataManagementWorkItem
instances is bad. Instead of
var obs = dm.GetWorkItemSource(10);
obs.Subscribe(wi => workItems.Add(wi));
await obs;
it should be
await dm.GetWorkItemSource(1).Do(wi => workItems.Add(wi));
The difference is that the latter subscribes to the source of items just once, whereas the original version subscribed twice:
Subscribe
await
It does not affect the qustion, but screws my mocking code.
Clarification
This more of a clarification. Each namespace produce a sequence of 10 policy objects. But this process is asynchronous - the policy objects are produced sequentially, but asynchronously. During all that time namespaces continue to be produced and hence given 25 namespaces before the fault there are three possible "states" in which a produced namespace can be:
When an error in the namespace production occurs the entire pipeline is aborted, regardless of the "state" in which "good" namespaces are right now.
Let us have a look at the following trivial example:
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
namespace observables
{
class Program
{
static void Main()
{
int count = 0;
var obs = Observable
.Interval(TimeSpan.FromMilliseconds(1))
.Take(50)
.Select(i =>
{
if (25 == Interlocked.Increment(ref count))
{
throw new Exception("Boom!");
}
return i;
})
.Select(i => Observable.Defer(() => Observable.Interval(TimeSpan.FromMilliseconds(1)).Take(10).Select(j => i * 1000 + j)))
.Merge(10);
var items = new HashSet<long>();
try
{
obs.Do(i => items.Add(i)).GetAwaiter().GetResult();
}
catch (Exception exc)
{
Debug.WriteLine(exc.Message);
}
Debug.WriteLine(items.Count);
}
}
}
When I run it I usually have the following output:
Boom!
192
But, it could also display 191. However, if we apply the fault concat solution (even if it does not work when there are no faults):
int count = 0;
var fault = new Subject<long>();
var obs = Observable
.Interval(TimeSpan.FromMilliseconds(1))
.Take(50)
.Select(i =>
{
if (25 == Interlocked.Increment(ref count))
{
throw new Exception("Boom!");
}
return i;
})
.Catch<long, Exception>(exc =>
{
fault.OnError(exc);
return Observable.Empty<long>();
})
.Select(i => Observable.Defer(() => Observable.Interval(TimeSpan.FromMilliseconds(1)).Take(10).Select(j => i * 1000 + j)))
.Merge(10)
.Concat(fault);
Then the output is consistently 240, because we let all the asynchronous processes that have already been started to complete.
An awkward solution based on answer by pmccloghrylaing
public IObservable<DataManagementWorkItem> GetWorkItemSource(int maxConcurrentCalls)
{
var fault = new Subject<DataManagementWorkItem>();
bool faulted = false;
return m_namespaceManager
.GetNamespaceConnectionInfoSource(true, drainAndDisable: false)
.Catch<NamespaceConnectionInfo, Exception>(exc =>
{
faulted = true;
return Observable.Throw<NamespaceConnectionInfo>(exc);
})
.Finally(() =>
{
if (!faulted)
{
fault.OnCompleted();
}
})
.Catch<NamespaceConnectionInfo, Exception>(exc =>
{
fault.OnError(exc);
return Observable.Empty<NamespaceConnectionInfo>();
})
.Select(nci => Observable.Defer(() => GetPolicySourceForNamespace(nci)))
.Merge(maxConcurrentCalls)
.Where(IsValid)
.Select(ToWorkItem)
.Where(o => o != null)
.Concat(fault);
}
It works both when the namespace production faults and both when it is successful, but it looks so awkward. Plus multiple subscriptions still share the fault. There must be a more elegant solution.
GetNamespaceConnectionInfoSource source code
public IObservable<NamespaceConnectionInfo> GetNamespaceConnectionInfoSource(bool? isActive = null,
bool? isWorkflowEnabled = null, bool? isScheduleEnabled = null, bool? drainAndDisable = null,
IEnumerable<string> nsList = null, string @where = null, IList<SqlParameter> whereParameters = null)
{
IList<SqlParameter> parameters;
var sql = GetNamespaceConnectionInfoSqls.GetSql(isActive,
isWorkflowEnabled, isScheduleEnabled, drainAndDisable, nsList, @where, whereParameters, out parameters);
var sqlUtil = m_sqlUtilProvider.Get(m_siteSettings.ControlDatabaseConnString);
return sqlUtil.GetSource(typeof(NamespaceConnectionInfo), sqlUtil.GetReaderAsync(sql, parameters)).Cast<NamespaceConnectionInfo>();
}
public IObservable<DbDataReader> GetReaderAsync(string query, IList<SqlParameter> parameters = null, CommandBehavior commandBehavior = CommandBehavior.Default)
{
return Observable.FromAsync(async () =>
{
SqlCommand command = null;
try
{
var conn = await GetConnectionAsync();
command = GetCommand(conn, query, parameters);
return (DbDataReader)await command.ExecuteReaderAsync(commandBehavior | CommandBehavior.CloseConnection);
}
finally
{
DisposeSilently(command);
}
});
}
public IObservable<object> GetSource(Type objectType, IObservable<DbDataReader> readerTask)
{
return Observable.Create<object>(async (obs, ct) => await PopulateSource(objectType, await readerTask, true, obs, ct));
}
private static async Task PopulateSource(Type objectType, DbDataReader reader, bool disposeReader, IObserver<object> obs, CancellationToken ct)
{
try
{
if (IsPrimitiveDataType(objectType))
{
while (await reader.ReadAsync(ct))
{
obs.OnNext(reader[0]);
}
}
else
{
// Get all the properties in our Object
var typeReflector = objectType.GetTypeReflector(TypeReflectorCreationStrategy.PREPARE_DATA_RECORD_CONSTRUCTOR);
// For each property get the data from the reader to the object
while (await reader.ReadAsync(ct))
{
obs.OnNext(typeReflector.DataRecordConstructor == null ?
ReadNextObject(typeReflector, reader) :
typeReflector.DataRecordConstructor(reader));
}
}
}
catch (OperationCanceledException)
{
}
finally
{
if (disposeReader)
{
reader.Dispose();
}
}
}
Upvotes: 2
Views: 2531
Reputation: 39192
Yeah the basic problem is the Merge
has a fail fast implementation. If the source observable produces an error, or any of the inner observables produce an error, then Merge
fails the stream without waiting for the remaining inner observables to finish.
To achieve what you want, you need to "catch" the error before merge sees it, and "rethrow" it after the inner observables have finished:
public IObservable<DataManagementWorkItem> GetWorkItemSource(int maxConcurrentCalls)
{
// wrap within Observable.Defer
// so that each new subscription
// gets its own Error subject
return Observable.Defer(() =>
{
var error = new ReplaySubject<DataManagementWorkItem>(1);
return m_namespaceManager
.GetNamespaceConnectionInfoSource(true, drainAndDisable: false)
.Catch(err =>
{
error.OnError(err);
return Observable.Empty<NamespaceConnectionInfo>();
})
.Finally(error.OnCompleted)
.Select(nci => Observable.Defer(() => GetPolicySourceForNamespace(nci)))
.Merge(maxConcurrentCalls)
.Where(IsValid)
.Select(ToWorkItem)
.Where(o => o != null)
.Concat(error);
});
}
Also, I note your unit test is subscribing twice to the returned observable, which is adding to your confusion. Once with a call to Subscribe
to populate your list, and again with await
. You really only want to subscribe once. We can use the .Do
operator to populate your list and you should be able to inspect it within your error handler:
var dm = DependencyResolver.Instance.GetInstance<IDataManagement>();
var workItems = new List<DataManagementWorkItem>();
try
{
var obs = dm.GetWorkItemSource(10).Do(workItems.Add);
await obs;
Assert.Fail("An expected exception was not thrown");
}
catch (Exception exc)
{
AssertTheRightException(exc);
// workItems should be populated.
}
Upvotes: 0
Reputation: 117064
The call to m_namespaceManager.GetNamespaceConnectionInfoSource(true, drainAndDisable: false)
returns an IObservable<NamespaceConnectionInfo>
. Now, the contract for any single observable is this:
OnNext*(OnError|OnCompleted)
This means that you get zero or more values followed by one, and only one, of either an error or a completion.
You cannot get multiple errors from a single observable and you cannot get values after you have gotten an error.
If your observable does return more than one error it is breaking the normal Rx contract.
So, given this, it is impossible for you, given the existing code, to delay errors until the end of the observable because an error is the end of the observable.
What you can do is change the way that you produce your values in GetNamespaceConnectionInfoSource
so that it generates multiple sequences calls .Materialize()
before merging them back as one. This means that you would have a single IObservable<Notification<NamespaceConnectionInfo>>
and that can have multiple errors and completions throughout the stream. You can then group this stream and handle the values before you handle the errors. But it all hinges on a change to GetNamespaceConnectionInfoSource
, and since you haven't posted the source for this I cannot give you the right code.
To help understand this, look at this code:
var xs = new [] { 1, 2, 3, 0, 4, 0, 5 }.ToObservable();
xs
.Select(x =>
{
if (x == 0)
throw new NotSupportedException();
else
return x;
})
.Subscribe(
x => Console.WriteLine(x),
ex => Console.WriteLine(ex.ToString()));
It produces this:
1
2
3
System.NotSupportedException: Specified method is not supported.
at UserQuery.<Main>b__0(Int32 x) in query_ioaahp.cs:line 45
at System.Reactive.Linq.ObservableImpl.Select`2._.OnNext(TSource value)
The values 4
& 5
simply do not get produced.
Now look at this code:
xs
.Select(x =>
Observable
.Start(() =>
{
if (x == 0)
throw new NotSupportedException();
else
return x;
})
.Materialize())
.Merge()
.Where(x => x.Kind != NotificationKind.OnCompleted)
.Subscribe(
x => Console.WriteLine(String.Format(
"{0} {1}",
x.Kind,
x.HasValue ? x.Value.ToString() : "")),
ex => Console.WriteLine(ex.ToString()));
This produces the following:
OnNext 1
OnNext 4
OnError
OnError
OnNext 5
OnNext 3
OnNext 2
It's out of order because of the introduced parallelism.
But now you can handle all of the errors.
Upvotes: 1
Reputation: 1129
Would Concat
solve your problem? I've wrapped it in Observable.Create
with Finally
to complete the faults
subject.
public IObservable<DataManagementWorkItem> GetWorkItemSource(int maxConcurrentCalls)
{
return Observable.Create<DataManagementWorkItem>((observer) =>
{
var faults = new Subject<DataManagementWorkItem>();
return m_namespaceManager
.GetNamespaceConnectionInfoSource(true, drainAndDisable: false)
.Catch<NamespaceConnectionInfo, Exception>(exc =>
{
faults.OnError(exc);
return Observable.Empty<NamespaceConnectionInfo>();
})
.Take(maxConcurrentCalls)
.Select(nci => GetPolicySourceForNamespace(nci))
.Merge()
.Where(IsValid)
.Select(ToWorkItem)
.Where(o => o != null)
.Finally(() => faults.OnCompleted())
.Concat(faults)
.Subscribe(observer);
});
}
Also, does this return what you expect? (24 in your test)
m_namespaceManager
.GetNamespaceConnectionInfoSource(true, drainAndDisable: false)
.Catch<NamespaceConnectionInfo, Exception>(exc =>
{
faults.OnError(exc);
return Observable.Empty<NamespaceConnectionInfo>();
})
.Count()
Upvotes: 0