Reputation: 387
I have an array of files like this..
string[] unZippedFiles; the idea is that I want to parse these files in paralle. As they are parsed a record gets placed on a concurrentbag. As record is getting placed I want to kick of the update function.
Here is what I am doing in my Main():
foreach(var file in unZippedFiles)
{ Parallel.Invoke
(
() => ImportFiles(file),
() => UpdateTest()
);
}
this is what the code of Update loooks like.
static void UpdateTest( )
{
Console.WriteLine("Updating/Inserting merchant information.");
while (!merchCollection.IsEmpty || producingRecords )
{
merchant x;
if (merchCollection.TryTake(out x))
{
UPDATE_MERCHANT(x.m_id, x.mInfo, x.month, x.year);
}
}
}
This is what the import code looks like. It's pretty much a giant string parser.
System.IO.StreamReader SR = new System.IO.StreamReader(fileName);
long COUNTER = 0;
StringBuilder contents = new StringBuilder( );
string M_ID = "";
string BOF_DELIMITER = "%%MS_SKEY_0000_000_PDF:";
string EOF_DELIMITER = "%%EOF";
try
{
record_count = 0;
producingRecords = true;
for (COUNTER = 0; COUNTER <= SR.BaseStream.Length - 1; COUNTER++)
{
if (SR.EndOfStream)
{
break;
}
contents.AppendLine(Strings.Trim(SR.ReadLine()));
contents.AppendLine(System.Environment.NewLine);
//contents += Strings.Trim(SR.ReadLine());
//contents += Strings.Chr(10);
if (contents.ToString().IndexOf((EOF_DELIMITER)) > -1)
{
if (contents.ToString().StartsWith(BOF_DELIMITER) & contents.ToString().IndexOf(EOF_DELIMITER) > -1)
{
string data = contents.ToString();
M_ID = data.Substring(data.IndexOf("_M") + 2, data.Substring(data.IndexOf("_M") + 2).IndexOf("_"));
Console.WriteLine("Merchant: " + M_ID);
merchant newmerch;
newmerch.m_id = M_ID;
newmerch.mInfo = data.Substring(0, (data.IndexOf(EOF_DELIMITER) + 5));
newmerch.month = DateTime.Now.AddMonths(-1).Month;
newmerch.year = DateTime.Now.AddMonths(-1).Year;
//Update(newmerch);
merchCollection.Add(newmerch);
}
contents.Clear();
//GC.Collect();
}
}
SR.Close();
// UpdateTest();
}
catch (Exception ex)
{
producingRecords = false;
}
finally
{
producingRecords = false;
}
}
the problem i am having is that the Update runs once and then the importfile function just takes over and does not yield to the update function. Any ideas on what am I doing wrong would be of great help.
Upvotes: 1
Views: 766
Reputation: 9587
Here's my stab at fixing your thread synchronisation. Note that I haven't changed any of the code from the functional standpoint (with the exception of taking out the catch
- it's generally a bad idea; exceptions need to be propagated).
Forgive if something doesn't compile - I'm writing this based on incomplete snippets.
Main
foreach(var file in unZippedFiles)
{
using (var merchCollection = new BlockingCollection<merchant>())
{
Parallel.Invoke
(
() => ImportFiles(file, merchCollection),
() => UpdateTest(merchCollection)
);
}
}
Update
private void UpdateTest(BlockingCollection<merchant> merchCollection)
{
Console.WriteLine("Updating/Inserting merchant information.");
foreach (merchant x in merchCollection.GetConsumingEnumerable())
{
UPDATE_MERCHANT(x.m_id, x.mInfo, x.month, x.year);
}
}
Import
Don't forget to pass in merchCollection
as a parameter - it should not be static.
System.IO.StreamReader SR = new System.IO.StreamReader(fileName);
long COUNTER = 0;
StringBuilder contents = new StringBuilder( );
string M_ID = "";
string BOF_DELIMITER = "%%MS_SKEY_0000_000_PDF:";
string EOF_DELIMITER = "%%EOF";
try
{
record_count = 0;
for (COUNTER = 0; COUNTER <= SR.BaseStream.Length - 1; COUNTER++)
{
if (SR.EndOfStream)
{
break;
}
contents.AppendLine(Strings.Trim(SR.ReadLine()));
contents.AppendLine(System.Environment.NewLine);
//contents += Strings.Trim(SR.ReadLine());
//contents += Strings.Chr(10);
if (contents.ToString().IndexOf((EOF_DELIMITER)) > -1)
{
if (contents.ToString().StartsWith(BOF_DELIMITER) & contents.ToString().IndexOf(EOF_DELIMITER) > -1)
{
string data = contents.ToString();
M_ID = data.Substring(data.IndexOf("_M") + 2, data.Substring(data.IndexOf("_M") + 2).IndexOf("_"));
Console.WriteLine("Merchant: " + M_ID);
merchant newmerch;
newmerch.m_id = M_ID;
newmerch.mInfo = data.Substring(0, (data.IndexOf(EOF_DELIMITER) + 5));
newmerch.month = DateTime.Now.AddMonths(-1).Month;
newmerch.year = DateTime.Now.AddMonths(-1).Year;
//Update(newmerch);
merchCollection.Add(newmerch);
}
contents.Clear();
//GC.Collect();
}
}
SR.Close();
// UpdateTest();
}
finally
{
merchCollection.CompleteAdding();
}
}
Upvotes: 4