Waddaulookingat
Waddaulookingat

Reputation: 387

Why is parallel.Invoke not working in this case

I have an array of files like this..

string[] unZippedFiles; the idea is that I want to parse these files in paralle. As they are parsed a record gets placed on a concurrentbag. As record is getting placed I want to kick of the update function.

Here is what I am doing in my Main():

    foreach(var file in unZippedFiles)
    {    Parallel.Invoke
             (   
                                  () => ImportFiles(file),
                                  () => UpdateTest()


                             );
            }

this is what the code of Update loooks like.

   static void UpdateTest( )
    {
        Console.WriteLine("Updating/Inserting merchant information.");

        while (!merchCollection.IsEmpty || producingRecords )
        {
            merchant x;
            if (merchCollection.TryTake(out x))
            {

                UPDATE_MERCHANT(x.m_id, x.mInfo, x.month, x.year);
            }
        }



    }

This is what the import code looks like. It's pretty much a giant string parser.

      System.IO.StreamReader SR = new System.IO.StreamReader(fileName);
             long COUNTER = 0;
             StringBuilder contents = new StringBuilder( );
             string M_ID = "";

             string BOF_DELIMITER = "%%MS_SKEY_0000_000_PDF:";
             string EOF_DELIMITER = "%%EOF";

             try
             {
                 record_count = 0;
                 producingRecords = true;
                 for (COUNTER = 0; COUNTER <= SR.BaseStream.Length - 1; COUNTER++)
                 {
                     if (SR.EndOfStream)
                     {
                         break;
                     }
                     contents.AppendLine(Strings.Trim(SR.ReadLine()));
                     contents.AppendLine(System.Environment.NewLine);
                     //contents += Strings.Trim(SR.ReadLine());
                     //contents += Strings.Chr(10);
                     if (contents.ToString().IndexOf((EOF_DELIMITER)) > -1)
                     {
                         if (contents.ToString().StartsWith(BOF_DELIMITER) & contents.ToString().IndexOf(EOF_DELIMITER) > -1)
                         {
                             string data = contents.ToString();
                             M_ID = data.Substring(data.IndexOf("_M") + 2, data.Substring(data.IndexOf("_M") + 2).IndexOf("_"));
                             Console.WriteLine("Merchant: " + M_ID);
                             merchant newmerch;
                             newmerch.m_id = M_ID;
                             newmerch.mInfo = data.Substring(0, (data.IndexOf(EOF_DELIMITER) + 5));
                             newmerch.month = DateTime.Now.AddMonths(-1).Month;
                             newmerch.year = DateTime.Now.AddMonths(-1).Year;
                             //Update(newmerch);
                             merchCollection.Add(newmerch);
                         }
                         contents.Clear();
                         //GC.Collect();
                     }
                 }

                 SR.Close();
                 // UpdateTest();

             }
             catch (Exception ex)
             {
                 producingRecords = false;

             }
             finally
             {
                 producingRecords = false;
             }
         }

the problem i am having is that the Update runs once and then the importfile function just takes over and does not yield to the update function. Any ideas on what am I doing wrong would be of great help.

Upvotes: 1

Views: 766

Answers (1)

Kirill Shlenskiy
Kirill Shlenskiy

Reputation: 9587

Here's my stab at fixing your thread synchronisation. Note that I haven't changed any of the code from the functional standpoint (with the exception of taking out the catch - it's generally a bad idea; exceptions need to be propagated).

Forgive if something doesn't compile - I'm writing this based on incomplete snippets.

Main

foreach(var file in unZippedFiles)
{
    using (var merchCollection = new BlockingCollection<merchant>())
    {
        Parallel.Invoke
        (   
            () => ImportFiles(file, merchCollection),
            () => UpdateTest(merchCollection)
        );
    }
}

Update

private void UpdateTest(BlockingCollection<merchant> merchCollection)
{
    Console.WriteLine("Updating/Inserting merchant information.");

    foreach (merchant x in merchCollection.GetConsumingEnumerable())
    {
        UPDATE_MERCHANT(x.m_id, x.mInfo, x.month, x.year);
    }
}

Import

Don't forget to pass in merchCollection as a parameter - it should not be static.

         System.IO.StreamReader SR = new System.IO.StreamReader(fileName);
         long COUNTER = 0;
         StringBuilder contents = new StringBuilder( );
         string M_ID = "";

         string BOF_DELIMITER = "%%MS_SKEY_0000_000_PDF:";
         string EOF_DELIMITER = "%%EOF";

         try
         {
             record_count = 0;

             for (COUNTER = 0; COUNTER <= SR.BaseStream.Length - 1; COUNTER++)
             {
                 if (SR.EndOfStream)
                 {
                     break;
                 }
                 contents.AppendLine(Strings.Trim(SR.ReadLine()));
                 contents.AppendLine(System.Environment.NewLine);
                 //contents += Strings.Trim(SR.ReadLine());
                 //contents += Strings.Chr(10);
                 if (contents.ToString().IndexOf((EOF_DELIMITER)) > -1)
                 {
                     if (contents.ToString().StartsWith(BOF_DELIMITER) & contents.ToString().IndexOf(EOF_DELIMITER) > -1)
                     {
                         string data = contents.ToString();
                         M_ID = data.Substring(data.IndexOf("_M") + 2, data.Substring(data.IndexOf("_M") + 2).IndexOf("_"));
                         Console.WriteLine("Merchant: " + M_ID);
                         merchant newmerch;
                         newmerch.m_id = M_ID;
                         newmerch.mInfo = data.Substring(0, (data.IndexOf(EOF_DELIMITER) + 5));
                         newmerch.month = DateTime.Now.AddMonths(-1).Month;
                         newmerch.year = DateTime.Now.AddMonths(-1).Year;
                         //Update(newmerch);
                         merchCollection.Add(newmerch);
                     }
                     contents.Clear();
                     //GC.Collect();
                 }
             }

             SR.Close();
             // UpdateTest();

         }
         finally
         {
             merchCollection.CompleteAdding();
         }
     }

Upvotes: 4

Related Questions