DumbJustice
DumbJustice

Reputation: 29

Read and Write to ConcurrentQueue across threads (C#)

I Have an application , in a nutshell it creates objects of type "WebPage".

These objecsts are then inserted into a SQL database.

I want to retrive these records from the database , and then load them into some files.

I create a While loop to read the results of the query , and for each row returned a Webpage object is created and added to a static ConcurrentQueue.

Here is where my problem is :

I want to have a seperate thread , that when something new appears on the ConcurrentQueue - it responds and writes the object out to my file. I already have this code working in a single threaded and serial fashion , but I want to speed it up.

I currently have a piece of code inside the reader from the SQL database , when the ConcurrentQueue reaches a certain amount of objects - it sends an autoreset event (see below)

if(flow.CheckEngineCapacity >= 2000 || (Convert.ToInt32(totalRows) - numberOfRecords) < 2000)
{
          waitHandle.Set();
          Thread fileProcessor = new Thread(delegate () { flow.ProcessExportEngineFlow(waitHandle); });
          fileProcessor.Start();
 }

what ends up happening is some sort of context switch where the main thread seems to sleep until that one completes - I did attempt to try work with await and async but suspect that is not what I need.

How would I go about getting it work in the following pattern

NOTE that if the concurrentqueue hits a certain amount of objects it should block until the thread doing Dequeue can free up some space.

The reason I am doing this is to make the solution as performant as possible - the bottlenecks should be write to files and read from database.

The below is the example of the class I have been trying to put together :

public class EngineFlow
{
    private static ConcurrentQueue<WebPages> _concurrentWebPageList = new ConcurrentQueue<WebPages>();

    public bool IncreaseEngineFlow(WebPages page)
    {
        bool sucessfullyadded = false;
        if (_concurrentWebPageList.Count <= 2000)
        {
            _concurrentWebPageList.Enqueue(page);
            sucessfullyadded = true;
        }
        else
        {
            return sucessfullyadded;
        }
        return sucessfullyadded;
    }

    public int CheckEngineCapacity { get { return _concurrentWebPageList.Count; } }

    private WebPages DecreaseEngineFlow()
    {
        WebPages page;
        _concurrentWebPageList.TryDequeue(out page);
        return page;
    }

    public void ProcessExportEngineFlow(AutoResetEvent waitHandle)
    {
        if (waitHandle.WaitOne() == false)
        {
            Thread.Sleep(100);
        }
        else
        {
            while (!_concurrentWebPageList.IsEmpty)
            {
                Console.WriteLine(DecreaseEngineFlow().URL);
                Console.WriteLine(CheckEngineCapacity);
                waitHandle.Set();
            }
        }
    }

Originally this was meant to be a producer and consumer but I feel like I may be overthinking it.

Upvotes: 1

Views: 700

Answers (1)

DumbJustice
DumbJustice

Reputation: 29

Thank you @Henk Holterman

The new class used a BlockingCollection - which solved all the problems :

 Task.Run(() =>
        {
            flow.ProcessExportEngineFlow();
        });

Task.Run(() =>
                {
                    while (reader.Read())
                    {
                           flow.IncreaseEngineFlow(webpage);
                    }

Class Definition :

 private BlockingCollection<WebPages> _concurrentWebPageList = new BlockingCollection<WebPages>(new ConcurrentQueue<WebPages>(), 1000);
    //private static ConcurrentQueue<WebPages> _concurrentWebPageList = new ConcurrentQueue<WebPages>();

    public void IncreaseEngineFlow(WebPages page)
    {
        _concurrentWebPageList.Add(page);
    }

    public WebPages DecreaseEngineFlow()
    {
        return _concurrentWebPageList.Take();
    }

    public void ProcessExportEngineFlow()
    {
        while(!_concurrentWebPageList.IsCompleted)
        {
            WebPages page = null;
            try
            {
                page = _concurrentWebPageList.Take();
            }
            catch (InvalidOperationException) { }

            if(page != null)
            {
                Console.WriteLine(page.URL);
            }
        }
    }

    public bool GetEngineState()
    {
        return _concurrentWebPageList.IsCompleted;
    }

    public void SetEngineCompleted()
    {
        _concurrentWebPageList.CompleteAdding();
    }

Upvotes: 2

Related Questions