civic.sir
civic.sir

Reputation: 410

c# multithread database fill table

I have a small dilemma. I have a function which goes through a list of Queries in a dataset with their own connection string (could be same or different) and starts filling in all the DataTables and returns a filled DataSet with all the queries filled in.

Currently, this process is done one by one, so if one of the queries took 10mins and the other 3 queries took 2 minutes each, the Run Time will be 16mins.

I was wondering if it's possible to use multithreading in this situation. This should call FillTable all at separate thread and should get the run time down to 10minutes. These are explicitly just Fill DataTable calls (there won't be any update or delete calls).

This is what I got so far:

    public void FillDataSet(ref DataSet Source)
    { 
        foreach (var r in Source.Tables["queries"].Rows)
        {
            string query = r["QueryStatement"].ToString();
            string qSource = r["QuerySource"].ToString();
            string tableName = r["TableName"].ToString();

            DBConnection db = new DBConnection();
            var TempSource = Source;
            taskConnection = Task.Factory.StartNew(() => callDB(db, query, tableName, ref TempSource));
            Source = TempSource;
        }
        Task.WaitAll(taskConnection); 
    }

    private void callDB(DBConnection db, string query, string tableName, ref DataSet Source)
    {
        using (var sql = new SqlConnection(db.ConnectionString))
        {
            sql.Open();
            using (var adp = new SqlDataAdapter(query, sql))
            {
                adp.SelectCommand.CommandTimeout = 0;
                adp.Fill(Source, tableName);
            }
        }
    }

I had to create a TempSource because lambda expression does not like passing in ref of Parameters (I cannot change this). Currently this does not work, not sure what I'm doing wrong.

Upvotes: 0

Views: 1426

Answers (1)

tcwicks
tcwicks

Reputation: 505

Here is a basic boilerplate you can use. Fill in the bit where I've left a comment:

// Set your connectionstring and execute the query and fill your data here

This is basic - I've used threads instead of threadpool because compared to the amount of work done the overhead of spawning a new thread is minimal. You can extend this if you want by keeping track of the threads and using thread signals etc... to implement way more advanced behavior.

Additionally if you want to pass any extra parameters to the piece of code that does the work add these to the work item definition class.

Note: This does not support multiple parallel executions of the main RunParallel method but you could easily extend it to do this.

public static class RunParallel
{
    const int NumThreadsToRunInParallel = 8;// Tune this for your DB server performance characteristics
    public static void FillDataSet(ref DataSet Source)
    {
        WorkItemDefinition Work;
        foreach (DataRow r in Source.Tables["queries"].Rows)
        {
            Work = new WorkItemDefinition();
            Work.Query = r["QueryStatement"].ToString();
            Work.QSource = r["QuerySource"].ToString();
            Work.TableName = r["TableName"].ToString();
            EnQueueWork(Work);
        }
        System.Threading.ThreadStart NewThreadStart;
        NewThreadStart = new System.Threading.ThreadStart(ProcessPendingWork);
        for (int I = 0; I < NumThreadsToRunInParallel; I ++)
        {
            System.Threading.Thread NewThread;
            NewThread = new System.Threading.Thread(NewThreadStart);
            //NewThread.IsBackground = true; //Do this if you want to allow the application to quit before these threads finish all their work and exit
            ThreadCounterInc();
            NewThread.Start();
        }
        while (ThreadCounterValue > 0)
        {
            System.Threading.Thread.Sleep(1000);
        }
    }

    private static void ProcessPendingWork()
    {
        try
        {
            WorkItemDefinition Work;
            Work = DeQueueWork();
            while (Work != null)
            {
                Work = DeQueueWork();
                DbConnection db = new OdbcConnection();
                // Set your connectionstring and execute the query and fill your data here
            }
        }
        finally
        {
            ThreadCounterDec();
        }
    }

    private static int ThreadCounter = 0;
    private static void ThreadCounterInc()
    {
        lock(SyncRoot)
        {
            ThreadCounter += 1;
        }
    }
    private static void ThreadCounterDec()
    {
        lock (SyncRoot)
        {
            ThreadCounter -= 1;
        }
    }
    private static int ThreadCounterValue
    {
        get
        {
            lock (SyncRoot)
            {
                return ThreadCounter;
            }
        }
    }

    private static object SyncRoot = new object();
    private static Queue<WorkItemDefinition> m_PendingWork = new Queue<WorkItemDefinition>();
    private static Queue<WorkItemDefinition> PendingWork
    {
        get
        {
            return m_PendingWork;
        }
    }

    private static WorkItemDefinition DeQueueWork()
    {
        lock (SyncRoot)
        {
            if (PendingWork.Count > 0) // Catch exception overhead is higher
            {
                return PendingWork.Dequeue();
            }
        }
        return null;
    }

    private static void EnQueueWork(WorkItemDefinition Work)
    {
        lock (SyncRoot)
        {
            PendingWork.Enqueue(Work);
        }
    }

    public class WorkItemDefinition
    {
        public string Query { get; set; }
        public string QSource { get; set; }
        public string TableName { get; set; }
    }
}

Upvotes: 1

Related Questions