dalemac
dalemac

Reputation: 494

C# Parallel.foreach - Making variables thread safe

I have been rewriting some process intensive looping to use TPL to increase speed. This is the first time I have tried threading, so want to check what I am doing is the correct way to do it.

The results are good - processing the data from 1000 Rows in a DataTable has reduced processing time from 34 minutes to 9 minutes when moving from a standard foreach loop into a Parallel.ForEach loop. For this test, I removed non thread safe operations, such as writing data to a log file and incrementing a counter.

I still need to write back into a log file and increment a counter, so i tried implementing a lock which encases the streamwriter/increment code block.

FileStream filestream = new FileStream("path_to_file.txt", FileMode.Create);
StreamWriter streamwriter = new StreamWriter(filestream);
streamwriter.AutoFlush = true;

try
{
    object locker = new object();

    // Lets assume we have a DataTable containing 1000 rows of data.
    DataTable datatable_results;

    if (datatable_results.Rows.Count > 0)
    {
        int row_counter = 0;

        Parallel.ForEach(datatable_results.AsEnumerable(), data_row =>
        { 
            // Process data_row as normal.

            // When ready to write to log, do so.
            lock (locker)
            {
                row_counter++;
                streamwriter.WriteLine("Processing row: {0}", row_counter);

                // Write any data we want to log.
            }
        });                    
    }
}
catch (Exception e)
{
    // Catch the exception.
}
streamwriter.Close();

The above seems to work as expected, with minimal performance costs (still 9 minutes execution time). Granted, the actions contained in the lock are hardly significant themselves - I assume that as the time taken to process code within the lock increases, the longer the thread is locked for, the more it affects processing time.

My question: is the above an efficient way of doing this or is there a different way of achieving the above that is either faster or safer?

Also, lets say our original DataTable actually contains 30000 rows. Is there anything to be gained by splitting this DataTable into chunks of 1000 rows each and then processing them in the Parallel.ForEach, instead of processing all 300000 rows in one go?

Upvotes: 2

Views: 18147

Answers (5)

Sriram Sakthivel
Sriram Sakthivel

Reputation: 73502

Writing to the file is expensive, you're holding a exclusive lock while writing to the file, that's bad. It's going to introduce contention.

You could add it in a buffer, then write to the file all at once. That should remove contention and provide way to scale.

if (datatable_results.Rows.Count > 0)
{
    ConcurrentQueue<string> buffer = new ConcurrentQueue<string>();
    Parallel.ForEach(datatable_results.AsEnumerable(), (data_row, state, index) =>
    {
        // Process data_row as normal.

        // When ready to write to log, do so.

        buffer.Enqueue(string.Format( "Processing row: {0}", index));
    });

    streamwriter.AutoFlush = false;
    string line;
    while (buffer.TryDequeue(out line))
    {
        streamwriter.WriteLine(line);
    }
    streamwriter.Flush();//Flush once when needed
}
  1. Note that you don't need to maintain a loop counter, Parallel.ForEach provides you one. Difference is that it is not the counter but index. If I've changed the expected behavior you can still add the counter back and use Interlocked.Increment to increment it.
  2. I see that you're using streamwriter.AutoFlush = true, that will hurt performance, you can set it to false and flush it once you're done writing all the data.

If possible, wrap the StreamWriter in using statement, so that you don't even need to flush the stream(you get it for free).

Alternatively, you could look at the logging frameworks which does their job pretty well. Example: NLog, Log4net etc.

Upvotes: 9

Gabe
Gabe

Reputation: 86848

First of all, it takes about 2 seconds to process a row in your table and perhaps a few milliseconds to increment the counter and write to the log file. With the actual processing being 1000x more than the part you need to serialize, the method doesn't matter too much.

Furthermore, the way you have implemented it is perfectly solid. There are ways to optimize it, but none that are worth implementing in your situation.

One useful way to avoid locking on the increment is to use Interlocked.Increment. It is a bit slower than x++ but much faster than lock {x++;}. In your case, though, it doesn't matter.

As for the file output, remember that the output is going to be serialized anyway, so at best you can minimize the amount of time spent in the lock. You can do this by buffering all of your output before entering the lock, then just perform the write operation inside the lock. You probably want to do async writes to avoid unnecessary blocking on I/O.

Upvotes: 1

Mike Sage
Mike Sage

Reputation: 266

This is my code that uses a parallel for. The concept is similar, and perhaps easier for you to implement. FYI, for debugging, I keep a regular for loop in the code and conditionally compile the parallel code. Hope this helps. The value of i in this scenario isn't the same as the number of records processed, however. You could create a counter and use a lock and add values for that. For my other code where I do have a counter, I didn't use a lock and just allowed the value to be potentially off to avoid the slower code. I have a status mechanism to indicate number of records processed. For my implementation, the slight chance that the count is not an issue - at the end of the loop I put out a message saying all the records have been processed.

#if DEBUG
        for (int i = 0; i < stend.PBBIBuckets.Count; i++)
        {
            //int serverIndex = 0;
#else
        ParallelOptions options = new ParallelOptions();
        options.MaxDegreeOfParallelism = m_maxThreads;

        Parallel.For(0, stend.PBBIBuckets.Count, options, (i) =>

        {
#endif
            g1client.Message request;
            DataTable requestTable;

            request = new g1client.Message();

            requestTable = request.GetDataTable();

            requestTable.Columns.AddRange(
                Locations.Columns.Cast<DataColumn>().Select(x => new DataColumn(x.ColumnName, x.DataType)).ToArray
                    ());

            FillPBBIRequestTables(requestTable, request, stend.PBBIBuckets[i], stend.BucketLen[i], stend.Hierarchies);
#if DEBUG
        }
#else
        });
#endif

Upvotes: 0

Tigran
Tigran

Reputation: 62265

You may try to improve this, if you avoid logging, or log into only thread specific log file (not sure if that makes sense to you)

TPL start as many threads as many cores you have Does Parallel.ForEach limits the number of active threads?.

So what you can do is:

1) Get numbers of core on target machine

2) Create a list of counters, with as many elements inside as many cores you have

3) Update counter for every core

4) Sum all them up after parallel execution terminates.

So, in practice :

//KEY(THREAD ID, VALUE: THREAD LOCAL COUNTER)
Dictionary<int,int> counters = new Dictionary<int, int>(NUMBER_OF_CORES); 
....
 Parallel.ForEach(datatable_results.AsEnumerable(), data_row =>
        { 
            // Process data_row as normal.

            // When ready to write to log, do so.
            //lock (locker) //NO NEED FOR LOCK, EVERY THREAD UPDATES ITS _OWN_ COUNTER
            //{
                //row_counter++;

                 counters[Thread.CurrentThread.ManagedThreadId].Value +=1;

                //NO WRITING< OR WRITING THREAD SPECIFIC FILE ONLY
                //streamwriter.WriteLine("Processing row: {0}", row_counter);


            //}
        });             
....

//AFTER EXECUTION OF PARALLEL LOOP SUM ALL COUNTERS AND GET TOTAL OF ALL THREADS. 

The benefit of this that no locking envolved at all, which will drammatically improve performance. When you use .net concurent collections, they are always use some kind of locking inside.

This is naturally a basic idea, may not work as it expected if you copy paste. We are talking about multi threading , which is always a hard topic. But, hopefully, it provides to you some ideas to relay on.

Upvotes: 1

Evgeniy Mironov
Evgeniy Mironov

Reputation: 787

You can transfer the parallel code in new method. For example :

    // Class scope
    private string GetLogRecord(int rowCounter, DataRow row)
    {
        return string.Format("Processing row: {0}", rowCounter); // Write any data we want to log.
    }

    //....
    Parallel.ForEach(datatable_results.AsEnumerable(), data_row =>
    { 
        // Process data_row as normal.

        // When ready to write to log, do so.
        lock (locker)
            row_counter++;

        var logRecord = GetLogRecord(row_counter, data_row);

        lock (locker)
            streamwriter.WriteLine(logRecord);
    });

Upvotes: 0

Related Questions