Reputation: 29
I Have an application , in a nutshell it creates objects of type "WebPage".
These objecsts are then inserted into a SQL database.
I want to retrive these records from the database , and then load them into some files.
I create a While loop to read the results of the query , and for each row returned a Webpage object is created and added to a static ConcurrentQueue.
Here is where my problem is :
I want to have a seperate thread , that when something new appears on the ConcurrentQueue - it responds and writes the object out to my file. I already have this code working in a single threaded and serial fashion , but I want to speed it up.
I currently have a piece of code inside the reader from the SQL database , when the ConcurrentQueue reaches a certain amount of objects - it sends an autoreset event (see below)
if(flow.CheckEngineCapacity >= 2000 || (Convert.ToInt32(totalRows) - numberOfRecords) < 2000)
{
waitHandle.Set();
Thread fileProcessor = new Thread(delegate () { flow.ProcessExportEngineFlow(waitHandle); });
fileProcessor.Start();
}
what ends up happening is some sort of context switch where the main thread seems to sleep until that one completes - I did attempt to try work with await and async but suspect that is not what I need.
How would I go about getting it work in the following pattern
NOTE that if the concurrentqueue hits a certain amount of objects it should block until the thread doing Dequeue can free up some space.
The reason I am doing this is to make the solution as performant as possible - the bottlenecks should be write to files and read from database.
The below is the example of the class I have been trying to put together :
public class EngineFlow
{
private static ConcurrentQueue<WebPages> _concurrentWebPageList = new ConcurrentQueue<WebPages>();
public bool IncreaseEngineFlow(WebPages page)
{
bool sucessfullyadded = false;
if (_concurrentWebPageList.Count <= 2000)
{
_concurrentWebPageList.Enqueue(page);
sucessfullyadded = true;
}
else
{
return sucessfullyadded;
}
return sucessfullyadded;
}
public int CheckEngineCapacity { get { return _concurrentWebPageList.Count; } }
private WebPages DecreaseEngineFlow()
{
WebPages page;
_concurrentWebPageList.TryDequeue(out page);
return page;
}
public void ProcessExportEngineFlow(AutoResetEvent waitHandle)
{
if (waitHandle.WaitOne() == false)
{
Thread.Sleep(100);
}
else
{
while (!_concurrentWebPageList.IsEmpty)
{
Console.WriteLine(DecreaseEngineFlow().URL);
Console.WriteLine(CheckEngineCapacity);
waitHandle.Set();
}
}
}
Originally this was meant to be a producer and consumer but I feel like I may be overthinking it.
Upvotes: 1
Views: 700
Reputation: 29
Thank you @Henk Holterman
The new class used a BlockingCollection - which solved all the problems :
Task.Run(() =>
{
flow.ProcessExportEngineFlow();
});
Task.Run(() =>
{
while (reader.Read())
{
flow.IncreaseEngineFlow(webpage);
}
Class Definition :
private BlockingCollection<WebPages> _concurrentWebPageList = new BlockingCollection<WebPages>(new ConcurrentQueue<WebPages>(), 1000);
//private static ConcurrentQueue<WebPages> _concurrentWebPageList = new ConcurrentQueue<WebPages>();
public void IncreaseEngineFlow(WebPages page)
{
_concurrentWebPageList.Add(page);
}
public WebPages DecreaseEngineFlow()
{
return _concurrentWebPageList.Take();
}
public void ProcessExportEngineFlow()
{
while(!_concurrentWebPageList.IsCompleted)
{
WebPages page = null;
try
{
page = _concurrentWebPageList.Take();
}
catch (InvalidOperationException) { }
if(page != null)
{
Console.WriteLine(page.URL);
}
}
}
public bool GetEngineState()
{
return _concurrentWebPageList.IsCompleted;
}
public void SetEngineCompleted()
{
_concurrentWebPageList.CompleteAdding();
}
Upvotes: 2