Reputation: 13955
I'm trying to make 4,000+ database calls run simultaneously. But what I'm finding is that instead, all of them will stack up, and none of them will finish until all of them have started.
What I'm trying so far is based on this thread:
Process thousands of database calls simultaneously
The logic works perfectly when it's done in a console app with no database call. But here, it's behaving as I described above.
using Dapper;
public async Task ProcessFileAsync(Stream blobFile)
{
List<Customer> customers = LoadCustomers(blobFile)
var tasks = RunWithMaxDegreeOfConcurrency(10, customers, customer => VerifyCustomerAsync(customer));
await Task.WhenAll(tasks);
DoStuffWhenAllDatabaseCallsAreFinished()
}
private async Task VerifyCustomerAsync(Customer customer)
{
RecordLog(LogType.Info, $"Starting {customer.CustomerName}");
var parameters = new DynamicParameters();
// ... create parameters
ValidaitonResult validaitonResult = null;
using (var connection = new SqlConnection(ConfigurationManager.ConnectionStrings["FubarConnection"].ConnectionString))
{
connection.Open();
var queryResult = await connection.QueryAsync<ValidaitonResult>("sp_name", parameters, commandType: CommandType.StoredProcedure);
validaitonResult = queryResult.FirstOrDefault();
}
// Handle the result
switch (validaitonResult.ValidaitonAction)
{
case ValidaitonAction.NoAction:
_customersNoAction.Add(customer);
break;
case ValidaitonAction.Insert:
_customersToInsert.Add(customer);
break;
default:
break;
}
RecordLog(LogType.Info, $"Finished {customer.CustomerName}");
}
private static async Task RunWithMaxDegreeOfConcurrency<T>(int maxDegreeOfConcurrency, IEnumerable<T> collection, Func<T, Task> taskFactory)
{
var activeTasks = new List<Task>(maxDegreeOfConcurrency);
foreach (var task in collection.Select(taskFactory))
{
activeTasks.Add(task);
if (activeTasks.Count == maxDegreeOfConcurrency)
{
await Task.WhenAny(activeTasks.ToArray());
activeTasks.RemoveAll(t => t.IsCompleted);
}
}
await Task.WhenAll(activeTasks.ToArray()).ContinueWith(t =>
{
//observe exceptions in a manner consistent with the above
});
}
Upvotes: 1
Views: 216
Reputation: 13955
Well, I feel silly. The problem was a typo in the name of the SP. The code works as written.
Upvotes: 0
Reputation: 89071
I don't see anything wrong with your code, apart from a lack of error handling. Try a simplified database operation like this:
using System;
using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Dapper;
namespace ConsoleApp26
{
class Customer
{
public string CustomerName { get; set; }
}
class Program
{
private object _customersNoAction;
public async Task ProcessFileAsync(Stream blobFile)
{
List<Customer> customers = Enumerable.Range(1, 1000).Select(i => new Customer() { CustomerName = $"Customer{i}" } ).ToList();
var tasks = RunWithMaxDegreeOfConcurrency(100, customers, customer => VerifyCustomerAsync(customer));
await Task.WhenAll(tasks);
DoStuffWhenAllDatabaseCallsAreFinished();
}
private void DoStuffWhenAllDatabaseCallsAreFinished()
{
RecordLog(LogType.Info, $"Finished");
}
private async Task VerifyCustomerAsync(Customer customer)
{
RecordLog(LogType.Info, $"Starting {customer.CustomerName}");
var parameters = new DynamicParameters();
// ... create parameters
ValidaitonResult validaitonResult = null;
using (var connection = new SqlConnection("server=.;database=tempdb;integrated security=true"))
{
connection.Open();
//var queryResult = await connection.QueryAsync<ValidaitonResult>("sp_name", parameters, commandType: CommandType.StoredProcedure);
var queryResult = await connection.QueryAsync<ValidaitonResult>("waitfor delay '0:0:2'; select 1 ValidationAction");
validaitonResult = queryResult.FirstOrDefault();
}
// Handle the result
RecordLog(LogType.Info, $"--Finished {customer.CustomerName}");
}
private void RecordLog(object info, string v)
{
Console.WriteLine($"{v}running on thread {System.Threading.Thread.CurrentThread.ManagedThreadId}");
}
private static async Task RunWithMaxDegreeOfConcurrency<T>(int maxDegreeOfConcurrency, IEnumerable<T> collection, Func<T, Task> taskFactory)
{
var activeTasks = new List<Task>(maxDegreeOfConcurrency);
foreach (var task in collection.Select(taskFactory))
{
activeTasks.Add(task);
if (activeTasks.Count == maxDegreeOfConcurrency)
{
await Task.WhenAny(activeTasks.ToArray());
foreach (var t in activeTasks)
{
if (t.IsFaulted)
throw t.Exception;
}
activeTasks.RemoveAll(t => t.IsCompleted);
}
}
await Task.WhenAll(activeTasks.ToArray()).ContinueWith(t =>
{
//observe exceptions in a manner consistent with the above
});
}
static void Main(string[] args)
{
var p = new Program();
p.ProcessFileAsync(null).Wait();
}
private class LogType
{
internal static readonly int Info = 1;
}
}
internal class ValidaitonResult
{
public int ValidaitonAction { get; internal set; }
}
}
Upvotes: 1
Reputation: 4244
Async != parallel. If you want to simulate parallel change RunWithMaxDegreeOfConcurrency
to use Parallel.Foreach
Upvotes: 0