AlexDeme
AlexDeme

Reputation: 13

Create a multi threaded applications to run multiple queries in c#

I'm trying to build a Windows Forms tool that runs queries asynchronously. The app has a datagridview with 30 possible queries to run. The user checks the queries he wants to execute, say 10 queries, and hits a button. The app has a variable called maxthreads = 3 (for the sake of discussion) that indicates how many threads can be used to async run the queries. The queries run on a production environment and we don't want to overload the system with too many threads running in the same time. Each query runs for an average of 30 sec. (some 5 min., others 2 sec.) In the datagridview there is an image column containing an icon that depicts the status of each query (0- Available to be run, 1-Selected for running, 2- Running, 3- Successfully completed, -1 Error) I need to be able to communicate with the UI every time a query starts and finishes. Once a query finishes, the results are being displayed in a datagridview contained in a Tabcontrol (one tab per query)

The approach: I was thinking to create a number of maxthread backgroundworkers and let them run the queries. As a backgroundworker finishes it communicates to the UI and is assigned to a new query and so on until all queries have been run.

I tried using an assignmentWorker that would dispatch the work to the background workers but don't know how to wait for all threads to finish. Once a bgw finishes it reports progress on the RunWorkerCompleted event to the assignmentWorker, but that one has already finished.

In the UI thread I call the assignment worker with all the queries that need to be run:

private void btnRunQueries_Click(object sender, EventArgs e)
    {
        if (AnyQueriesSelected())
        {
            tcResult.TabPages.Clear();

            foreach (DataGridViewRow dgr in dgvQueries.Rows)
            {
                if (Convert.ToBoolean(dgr.Cells["chk"].Value))
                {
                    Query q = new Query(dgr.Cells["ID"].Value.ToString(),
                        dgr.Cells["Name"].Value.ToString(),
                        dgr.Cells["FileName"].Value.ToString(),
                        dgr.Cells["ShortDescription"].Value.ToString(),
                        dgr.Cells["LongDescription"].Value.ToString(),
                        dgr.Cells["Level"].Value.ToString(),
                        dgr.Cells["Task"].Value.ToString(),
                        dgr.Cells["Importance"].Value.ToString(),
                        dgr.Cells["SkillSet"].Value.ToString(),
                        false,
                        new Dictionary<string, string>() 
                        { { "#ClntNb#", txtClntNum.Text }, { "#Staff#", "100300" } });

                    qryList.Add(q);
                }
            }
            assignmentWorker.RunWorkerAsync(qryList);
        }
        else
        {
            MessageBox.Show("Please select at least one query.",
                            "Warning",
                            MessageBoxButtons.OK,
                            MessageBoxIcon.Information);
        }
    }

Here is the AssignmentWorker:

private void assignmentWorker_DoWork(object sender, DoWorkEventArgs e)
    {
        foreach (Query q in (List<Query>)e.Argument)
        {
            while (!q.Processed)
            {
                for (int threadNum = 0; threadNum < maxThreads; threadNum++)
                {
                    if (!threadArray[threadNum].IsBusy)
                    {
                        threadArray[threadNum].RunWorkerAsync(q);
                        q.Processed = true;
                        assignmentWorker.ReportProgress(1, q);
                        break;
                    }
                }

                //If all threads are being used, sleep awhile before checking again
                if (!q.Processed)
                {
                    Thread.Sleep(500);
                }
            }
        }
    }

All bgw run the same event:

private void backgroundWorkerFiles_DoWork(object sender, DoWorkEventArgs e)
    {
        try
        {
            Query qry = (Query)e.Argument;

            DataTable dtNew = DataAccess.RunQuery(qry).dtResult;

            if (dsQryResults.Tables.Contains(dtNew.TableName))
            {
                dsQryResults.Tables.Remove(dtNew.TableName);
            }

            dsQryResults.Tables.Add(dtNew);

            e.Result = qry;
        }
        catch (Exception ex)
        {

        }
    }

Once the Query has returned and the DataTable has been added to the dataset:

private void backgroundWorkerFiles_RunWorkerCompleted(object sender, 
                                                    RunWorkerCompletedEventArgs e)
    {
        try
        {
            if (e.Error != null)
            {
                assignmentWorker.ReportProgress(-1, e.Result);
            }
            else
            {
                assignmentWorker.ReportProgress(2, e.Result);
            }
        }
        catch (Exception ex)
        {
            int o = 0;
        }
    }

The problem I have is that the assignment worker finishes before the bgw finish and the call to assignmentWorker.ReportProgress go to hell (excuse my French). How can I wait for all the launched bgw to finish before finishing the assignment worker?

Thank you!

Upvotes: 1

Views: 559

Answers (1)

Peter Duniho
Peter Duniho

Reputation: 70701

As noted in the comment above, you have overcomplicated your design. If you have a specific maximum number of tasks (queries) that should be executing concurrently, you can and should simply create that number of workers, and have them consume tasks from your queue (or list) of tasks until that queue is empty.

Lacking a good Minimal, Complete, and Verifiable code example that concisely and clearly illustrates your specific scenario, it's not feasible to provide code that would directly address your question. But, here's an example using a List<T> as your original code does, which will work as I describe above:

using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace TestSO42101517WaitAsyncTasks
{
    class Program
    {
        static void Main(string[] args)
        {
            Random random = new Random();
            int maxTasks = 30,
                maxActive = 3,
                maxDelayMs = 1000,
                currentDelay = -1;
            List<TimeSpan> taskDelays = new List<TimeSpan>(maxTasks);

            for (int i = 0; i < maxTasks; i++)
            {
                taskDelays.Add(TimeSpan.FromMilliseconds(random.Next(maxDelayMs)));
            }

            Task[] tasks = new Task[maxActive];
            object o = new object();

            for (int i = 0; i < maxActive; i++)
            {
                int workerIndex = i;

                tasks[i] = Task.Run(() =>
                {
                    DelayConsumer(ref currentDelay, taskDelays, o, workerIndex);
                });
            }

            Console.WriteLine("Waiting for consumer tasks");

            Task.WaitAll(tasks);

            Console.WriteLine("All consumer tasks completed");
        }

        private static void DelayConsumer(ref int currentDelay, List<TimeSpan> taskDelays, object o, int workerIndex)
        {
            Console.WriteLine($"worker #{workerIndex} starting");

            while (true)
            {
                TimeSpan delay;    
                int delayIndex;

                lock (o)
                {
                    delayIndex = ++currentDelay;
                    if (delayIndex < taskDelays.Count)
                    {
                        delay = taskDelays[delayIndex];
                    }
                    else
                    {
                        Console.WriteLine($"worker #{workerIndex} exiting");
                        return;
                    }
                }

                Console.WriteLine($"worker #{workerIndex} sleeping for {delay.TotalMilliseconds} ms, task #{delayIndex}");
                System.Threading.Thread.Sleep(delay);
            }
        }
    }
}

In your case, each worker would report progress to some global state. You don't show the ReportProgress handler for your "assignment" worker, so I can't say specifically what this would look like. But presumably it would involve passing either -1 or 2 to some method that knows what to do with those values (i.e. what would otherwise have been your ReportProgress handler).

Note that the code can simplified somewhat, particularly where the individual tasks are consumed, if you use an actual queue data structure for the tasks. That approach would look something like this:

using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;

namespace TestSO42101517WaitAsyncTasks
{
    class Program
    {
        static void Main(string[] args)
        {
            Random random = new Random();
            int maxTasks = 30,
                maxActive = 3,
                maxDelayMs = 1000,
                currentDelay = -1;
            ConcurrentQueue<TimeSpan> taskDelays = new ConcurrentQueue<TimeSpan>();

            for (int i = 0; i < maxTasks; i++)
            {
                taskDelays.Enqueue(TimeSpan.FromMilliseconds(random.Next(maxDelayMs)));
            }

            Task[] tasks = new Task[maxActive];

            for (int i = 0; i < maxActive; i++)
            {
                int workerIndex = i;

                tasks[i] = Task.Run(() =>
                {
                    DelayConsumer(ref currentDelay, taskDelays, workerIndex);
                });
            }

            Console.WriteLine("Waiting for consumer tasks");

            Task.WaitAll(tasks);

            Console.WriteLine("All consumer tasks completed");
        }

        private static void DelayConsumer(ref int currentDelayIndex, ConcurrentQueue<TimeSpan> taskDelays, int workerIndex)
        {
            Console.WriteLine($"worker #{workerIndex} starting");

            while (true)
            {
                TimeSpan delay;

                if (!taskDelays.TryDequeue(out delay))
                {
                    Console.WriteLine($"worker #{workerIndex} exiting");
                    return;
                }

                int delayIndex = System.Threading.Interlocked.Increment(ref currentDelayIndex);

                Console.WriteLine($"worker #{workerIndex} sleeping for {delay.TotalMilliseconds} ms, task #{delayIndex}");
                System.Threading.Thread.Sleep(delay);
            }
        }
    }
}

Upvotes: 1

Related Questions