Reputation: 62732
I have the following code:
innerExceptions = dbconnByServer
.AsParallel()
.WithDegreeOfParallelism(dbconnByServer.Count)
// A stream of groups of server connections proceeding in parallel per server
.Select(dbconns => dbconns.Select(dbconn => m_sqlUtilProvider.Get(dbconn)))
// A stream of groups of SqlUtil objects proceeding in parallel per server
.Select(sqlUtils => GetTaskException(sqlUtils
// Aggregate SqlUtil objects to form a single Task which runs the SQL asynchronously for the first SqlUtil, then upon completion
// for the next SqlUtil and so long until all the SqlUtil objects are processed asynchronously one after another.
.Aggregate<ISqlUtil, Task>(null, (res, sqlUtil) =>
{
if (res == null)
{
return sqlUtil.ExecuteSqlAsync(SQL, parameters);
}
return res.ContinueWith(_ => sqlUtil.ExecuteSqlAsync(SQL, parameters)).Unwrap();
})))
.Where(e => e != null)
.ToList();
Where:
private static Exception GetTaskException(Task t)
{
try
{
t.Wait();
return null;
}
catch (Exception exc)
{
return exc;
}
}
What this code does is execute certain SQL statement across a multitude of db connections, where some connections may belong to one DB server, while others - to another and so on.
The code makes sure that two conditions hold:
Given N db connections per some DB server there will be a single Task
in the end of the aggregation, executing which has the following effect:
My problem is that right now exceptions are lost except of the very first db connection. I know I should examine the _
argument and somehow process the _.Exception
property inside the continuation function. I wonder if there is an elegant way to do it.
Any ideas?
Upvotes: 5
Views: 462
Reputation: 244757
With that, my code for doing this would look like this:
public Task<IEnumerable<Exception>> ExecuteOnServersAsync(
IList<IEnumerable<Connection>> dbConnByServer,
string sql, object parameters)
{
var tasks = new List<Task>();
var exceptions = new ConcurrentQueue<Exception>();
Action<Task> handleException = t =>
{
if (t.IsFaulted)
exceptions.Enqueue(t.Exception);
};
foreach (var dbConns in dbConnByServer)
{
Task task = null;
foreach (var dbConn in dbConns)
{
var sqlUtil = m_sqlUtilProvider.Get(dbConn);
if (task == null)
{
task = sqlUtil.ExecuteSqlAsync(sql, parameters);
}
else
{
task = task.ContinueWith(
t =>
{
handleException(t);
return sqlUtil.ExecuteSqlAsync(sql, parameters);
}).Unwrap();
}
}
if (task != null)
{
task = task.ContinueWith(handleException);
tasks.Add(task);
}
}
return Task.Factory.ContinueWhenAll(
tasks.ToArray(), _ => exceptions.AsEnumerable());
}
Upvotes: 1