Cod29
Cod29

Reputation: 295

ConcurrentDictionary and ConcurrentBag for AddOrUpdate on parallel

Is it the correct way to use ConcurrentDictionary and ConcurrentBag to AddOrUpdate values.

Basically tried to do as follow,

  1. having file with millions of records and trying to process and extract to object.

  2. And entry is like, Key-Value pair, Key=WBAN and Value as object.

     var cd = new ConcurrentDictionary<String, ConcurrentBag<Data>>();
     int count = 0;
    
     foreach (var line in File.ReadLines(path).AsParallel().WithDegreeOfParallelism(5))
     {
         var sInfo = line.Split(new char[] { ',' });
         cd.AddOrUpdate(sInfo[0], new ConcurrentBag<Data>(){ new Data()
         {
             WBAN =  sInfo[0],
                 Date = string.IsNullOrEmpty(sInfo[1]) ? "" : sInfo[1],
                 time = string.IsNullOrEmpty(sInfo[2]) ? "" : sInfo[2]
     }
         }
         ,
         (oldKey, oldValue) =>
         {
             oldValue.Add(new Data()
             {
                 WBAN = sInfo[0],
                 Date = string.IsNullOrEmpty(sInfo[1]) ? "" : sInfo[1],
                 time = string.IsNullOrEmpty(sInfo[2]) ? "" : sInfo[2]
             });
    
             return oldValue;
         }
         );
     }
    

Upvotes: 1

Views: 1959

Answers (2)

Theodor Zoulias
Theodor Zoulias

Reputation: 43515

Your idea is basically correct, but there is a flaw in the implementation. Enumerating a ParallelQuery with the foreach statement is not causing the code inside the loop to run in parallel. At this stage the parallelization phase has already been completed. In your code there is actually no work parallelized, because there is no operator attached after the .AsParallel().WithDegreeOfParallelism(5). To do the looping in parallel you must replace the foreach with the ForAll operator, like this:

File.ReadLines(path)
    .AsParallel()
    .WithDegreeOfParallelism(5)
    .ForAll(line => { /* Process each line in parallel */ });

It is important to know what is parallelized here. The processing of each line is parallelized, while the loading of each line from the filesystem is not. The loading is serialized. The worker threads employed by the Parallel LINQ engine (one of whom is the current thread) are synchronized when accessing the source IEnumerable (the File.ReadLines(path) in this case).

Using a nested ConcurrentDictionary<String, ConcurrentBag<Data>> structure to store the processed lines is not very efficient. You can trust PLINQ to do a better job at grouping the data, than you could do manually with concurrent collections and such. By using the ToLookup operator you can get an ILookup<string, Data>, which is essentially a readonly dictionary with multiple values for each key.

var separators = new char[] { ',' };

var lookup = File.ReadLines(path)
    .AsParallel()
    .WithDegreeOfParallelism(5)
    .Select(line => line.Split(separators))
    .ToLookup(sInfo => sInfo[0], sInfo => new Data()
    {
        WBAN =  sInfo[0],
        Date = string.IsNullOrEmpty(sInfo[1]) ? "" : sInfo[1],
        time = string.IsNullOrEmpty(sInfo[2]) ? "" : sInfo[2]
    });

This should be a better option regarding performance and memory efficiency, unless you specifically want the resulting structure to be mutable and thread-safe for some reason.

Two more notes:

  1. Hardcoding the degree of parallelism (5 in this case) is OK, provided that you know the hardware where your program will run. Otherwise it may cause friction by over-subscription (having more threads than the actual cores of the machine). Hint: virtual machines are often configured to be single threaded.

  2. The ConcurrentBag<T> is a very specialized collection. In the majority of cases you'll get better performance with a ConcurrentQueue<T>. Both classes offer a similar API. People probably prefer the ConcurrentBag<T> because its Add method is more familiar than the Enqueue.

Upvotes: 1

Dai
Dai

Reputation: 155145

  • Your program is IO-bound, not CPU-bound, so there is no advantage to parallelizing your processing.
    • It's IO-bound because your program can't process a line of data without having first read that line from the file, and generally speaking computers always read data from storage much more slowly than they can process it.
    • As your program is performing only trivial string operations on each line read, I can safely say with 99.9% confidence that the time it takes to add a Data element to a Dictionary<String,List<Data>> is a tiny tiny tiny fraction of the time it takes for your computer to read a single line from a text-file.
  • Also, avoid using File.ReadLines for programs like these because that will first read the entire file into memory.
    • If you look at my solution, you'll see it uses StreamReader to read each line one-by-one which means it doesn't need to wait until it reads everything into memory first.

So to parse that file with the best possible performance you don't need any Concurrent collections.

Just this:

private static readonly Char[] _sep = new Char[] { ',' }; // Declared here to ensure only a single array allocation.

public static async Task< Dictionary<String,List<Data>> > ReadFileAsync( FileInfo file )
{
    const Int32 ONE_MEGABYTE = 1 * 1024 * 1024; // Use 1MB+ sized buffers for async IO. Not smaller buffers like 1024 or 4096 as those are for synchronous IO.

    Dictionary<String,List<Data>> dict = new Dictionary<String,List<Data>>( capacity: 1024 );


    using( FileStream fs = new FileStream( path, FileAccess.Read, FileMode.Open, FileShare.Read, ONE_MEGABYTE, FileOptions.Asynchronous | FileOptions.SequentialScan ) )
    using( StreamReader rdr = new StreamReader( fs ) )
    {
        String line;
        while( ( line = await rdr.ReadLineAsync().ConfigureAwait(false) ) != null )
        {
            String[] values = line.Split( sep );
            if( values.Length < 3 ) continue;

            Data d = new Data()
            {
                WBAN = values[0],
                Date = values[1],
                time = values[2]
            };

            if( !dict.TryGetValue( d.WBAN, out List<Data> list ) )
            {
                dict[ d.WBAN ] = list = new List<Data>();
            }

            list.Add( d );
        }
    }
}

Update: Hypothetically speaking...

Hypothetically speaking, because file IO (especially asynchronous FileStream IO) uses large buffers (in this case, a ONE_MEGABYTE-sized buffer) the program could pass each buffer (as it's read, sequentially) into a parallel processor.

However the problem is that the data inside that buffer cannot be trivially apportioned-out to individual threads: in this case because the length of a line is not fixed, so a single thread still needs to read through the entire buffer to find out where the line-breaks are (and technically that could be parallelized somewhat, it would be adding huge amounts of complexity (as you also need to handle lines that cross buffer boundaries, or buffers that contain only a single line, etc).

And at this small scale the overhead of using the thread-pool and concurrent collection-types would erase the speed-up of parallel-procesing given the program would still largely be IO-bound.

Now, if you had a file sized in the gigabytes, with Data records that were sized around 1KB then I would go into detail demonstrating how you could do it because at that scale you probably would see a modest performance boost.

Upvotes: 0

Related Questions