VA systems engineer
VA systems engineer

Reputation: 3159

Is this C# Parallel.ForEach "localFinally" method thread safe?

Due to the benefits stated by MicroSoft, I'm implementing a large series of SQL SELECT statements (multiple-thousands of them) using Parallel.ForEach (BTW: I can't find it now, but I'm pretty sure I read somewhere that Parallel.ForEach won't return control until all iterations have been executed. Is this true?)

I began with Parallel.Foreach SQL querying sometimes results in Connection. I don't want to use the accepted answer because it abandoned Parallel.ForEach and used instead just Task.Run (or Task Factory.StartNew for .Net 4.0). At the same time, the O.P. used "lock" to sync updates to a list of DataTable which I understand may lessen the efficiency of Parallel.ForEach.

So, using the article How to: Write a Parallel.ForEach Loop with Thread-Local Variables, I wrote the strawman code below. The goal is to accumulate the threadLocalDataTable (one per select statement) to the dataTableList that is returned once all queries are done. It works, but I wonder if the localFinally method is really thread safe (see the line of code with the comment //<-- the localFinally method in question. Note: the SqlOperation class implements a connection string, output datatable name, and select query string

public static IList<DataTable> GetAllData(IEnumerable<SqlOperation> sqlList)
{
    IList<DataTable> dataTableList = new List<DataTable>();
    dataTableList.Clear();

    try
    {

        Parallel.ForEach<SqlOperation, DataTable>(sqlList, () => null, (s, loop, threadLocalDataTable) =>
        {

            DataTable dataTable = null;

            using (SqlCommand sqlCommand = new SqlCommand())
            {
                using (SqlConnection sqlConnection = new SqlConnection(s.ConnectionString))
                {
                    sqlConnection.Open();
                    sqlCommand.CommandType = CommandType.Text;
                    sqlCommand.Connection = sqlConnection;
                    sqlCommand.CommandText = s.SelectQuery;
                    SqlDataAdapter sqlDataAdapter = new SqlDataAdapter(sqlCommand);

                    dataTable = new DataTable
                    {
                        TableName = s.DataTableName
                    };

                    dataTable.Clear();
                    dataTable.Rows.Clear();
                    dataTable.Columns.Clear();

                    sqlDataAdapter.Fill(dataTable);
                    sqlDataAdapter.Dispose();
                    sqlConnection.Close();
                }
            }

            return dataTable;


        }, (threadLocalDataTable) => dataTableList.Add(threadLocalDataTable) //<-- the localFinally method in question
        );
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.ToString(), "GetAllData error");
    }

    return dataTableList;
}

Upvotes: 3

Views: 1406

Answers (1)

Evk
Evk

Reputation: 101453

Documentation by the link you provided in question clearly states (about localFinally):

This delegate can be invoked concurrently by multiple tasks

So no, it's not thread safe. But that's not the only problem - your whole implementation is incorrect.

"Thread local" variable passed to each loop iteration is accumulator. It accumulates results obtained by multiple iterations running on the same thread. Your implementation completely ignores that variable and always returns one data table. That means if you have more tables than parallel loop number of threads (which defaults to number of cores for your processor) - you are actually loosing results, because you ignore accumulator.

Correct way would be to use List<DataTable> as accumulator, not a single one. For example:

Parallel.ForEach<SqlOperation, List<DataTable>>(
    sqlList, 
    () => new List<DataTable>(), // initialize accumulator
    (s, loop, threadLocalDataTables) =>
    {
        DataTable result = ...;
        // add, that's thread safe
        threadLocalDataTables.Add(result);
        // return accumulator to next iteration
        return threadLocalDataTables;
    }, (threadLocalDataTables) =>
    {
        //<-- the localFinally method in question
        lock (dataTableList) // lock and merge results
           dataTableList.AddRange(threadLocalDataTables);
    });

That said - using Parallel.ForEach to speed up IO-bound work (like making SQL query) is not a good idea, using async\await will do it better. But that's out of scope for this question.

Upvotes: 4

Related Questions