vicky
vicky

Reputation: 23

Processing of multiple threads

I am using threads to upload images on a FTP. Now I have a problem in limiting the number of threads. when I am creating same number of threads equal to images then it's fine i.e. it is working fine. But now I want to create only suppose maximum of 5 number of threads to upload 100 or more images. I have a datatable in which these 100 images are with a unique field ID which stores suppose 0,1,2,3....and so on for every images. Now I want to start only five threads once so that it may start uploading 5 images parallely. On a Timer, I am checking the status of threads and if I found a thread which is not live now, I want to assign it the 6th Image for uploading and in the same way, if I found other thread which finished its uploading/work, I want to give it 7th image to upload and so on. i.e. this process will run until 100 images are uploaded. Can you please suggest me a structure by using which I may achieve this? Currently I am creating 100 threads for 100 images and it is working perfect. But I am afraid of creating that much number of threads. Will that affect performance?

My Current Code is:

// A page level variable
Thread [] tr=null;
//On Load of the Control
 tr = new Thread[dt.Rows.Count];
        //tr = new Thread[MaxID];
        for (int i = 0; i < dt.Rows.Count; i++)
        //for (int i = 0; i < MaxID; i++)
        {
            tr[i] = new Thread(new ThreadStart(ProcessItems));
            tr[i].Name = Convert.ToString(dt.Rows[i]["Id"]);
            tr[i].IsBackground = true;
        }

        //Start each thread
        foreach (Thread x in tr)
        {
            x.Start();
        }

    //The method which is used to upload images
            public object tLock = new object();
    private void ProcessItems()
    {
        //if (dict.Count == 0)
        //    pthread.Suspend();
        //ArrayList toRemove = new ArrayList();
        lock (tLock)
        {
            try
            {
                //int NoofAttempts = 0;
                //foreach (DictionaryEntry e in dict)
                //{
                    //Thread.Sleep(500);
                  dr = dt.Select("Is_Uploaded=0 And Id=" + Thread.CurrentThread.Name).FirstOrDefault();

                    uxImageAndProgress pbCtl = panelControl1.Controls[dr["Image_ID"].ToString()] as uxImageAndProgress;
                    //NoofAttempts = 0;
                    string Path = "";
                    if (ftpPath == "")
                    {
                        Path = Global.FTPRemotePath + "/ProductImages/" + dr["Image_ID"] + dr["Extension"].ToString();
                    }
                    else
                    {
                        Path = ftpPath + dr["Image_ID"] + dr["Extension"].ToString();
                    }
                    //object[] loader = e.Value as object[];
                    int length = (int)(dr["ActualData"] as byte[]).Length;
                    Stream stream = new MemoryStream(dr["ActualData"] as byte[]);
                    byte[] rBuffer = ReadToEnd(stream);
                    int d = length - (int)stream.Length;
                    d = Math.Min(d, rnd.Next(10) + 1);

                    if (ftpRequest == null)
                    {
                        try
                        {
                            #region New Code
                            ftpRequest = (FtpWebRequest)FtpWebRequest.Create(new Uri(Path));
                            ftpRequest.Method = WebRequestMethods.Ftp.UploadFile;
                            ftpRequest.Credentials = new NetworkCredential(Global.FTPLogIn, Global.FTPPassword);
                            ftpRequest.UsePassive = true;
                            ftpRequest.UseBinary = true;
                            ftpRequest.KeepAlive = true;
                            ftpRequest.Timeout = 20000;
                            ftpRequest.ContentLength = length;
                            byte[] buffer = new byte[length > 4097 ? 4097 : length];
                            int bytes = 0;
                            int total_bytes = (int)length;
                            System.IO.Stream rs = ftpRequest.GetRequestStream();
                            while (total_bytes > 0)
                            {
                                bytes = stream.Read(buffer, 0, buffer.Length);
                                rs.Write(buffer, 0, bytes);
                                total_bytes = total_bytes - bytes;
                            }
                            dr["Is_Uploaded"] = 1;
                            dt.AcceptChanges();
                            ftpRequest = null;
                            pbCtl.Is_Uploaded = true;
                            rs.Close();
                            #endregion
                        }
                        catch (Exception eeex)
                        {
                            ftpRequest = null;
                            if (ErrorText == "")
                                ErrorText = eeex.Message.ToString();
                            else
                                ErrorText = ErrorText + "," + eeex.Message.ToString();

                            if (Image_IDsToDelete == "")
                                Image_IDsToDelete = dr["Image_ID"].ToString();
                            else
                                Image_IDsToDelete = Image_IDsToDelete + "," + dr["Image_ID"].ToString();

                            if (NotUploadedFiles == "")
                                NotUploadedFiles = Convert.ToString(dr["FileName"]);//dr["Image_ID"] + dr["Extension"].ToString();
                            else
                                NotUploadedFiles = NotUploadedFiles + ", " + Convert.ToString(dr["FileName"]); 

                            dr["Is_Uploaded"] = true;
                            dt.AcceptChanges();
                            ftpRequest = null;
                            pbCtl.Is_Uploaded = true;
                            pbCtl.Is_WithError = true;
                        }
                    }
            }
            catch (Exception ex)
            {
                XtraMessageBox.Show(ex.Message.ToString(), Global.Header, MessageBoxButtons.OK);
                //pthread.Suspend();
            }
        }
    }
//The Timer Event on which I am checking the Status of threads and taking appropriate action
        private void timer1_Tick(object sender, EventArgs e)
    {
        bool Is_AllFinished=true;
        //Start each thread
        foreach (Thread x in tr)
        {
            if (x.IsAlive == true)
            {
                Is_AllFinished = false;
                break;
            }
            else
            {
                //DataRow[] drs = dt.Select("Is_Uploaded=0");
                //if (drs.Count() > 0)
                //{
                    //x. = Convert.ToString(MaxID + 1);
                    //x.Start();
                    //MaxID = MaxID + 1;
                //}
            }
        }

        if (Is_AllFinished == true)
        {
            timer1.Enabled = false;
            if (Image_IDsToDelete != "")
            {
                RetailHelper.ExecuteNonQuery("Delete from images where Image_ID in (" + Image_IDsToDelete + ")");
            }

            if (ErrorText != "")
            {
                NotUploadedFiles = NotUploadedFiles + ".";
                XtraMessageBox.Show("Unable to connect to server. The following files were not uploaded:" + System.Environment.NewLine + NotUploadedFiles + ".", Global.Header, MessageBoxButtons.OK, MessageBoxIcon.Information);
            }

            Is_Done = true;

        }
    }

Now, I want to convert this code to use a fixed number of threads. Please help me. Thanking you!

Upvotes: 0

Views: 108

Answers (2)

Alireza
Alireza

Reputation: 5056

Use a Semaphore it is good enough. You can polish the code yourself.

    const int maxThreads = 5;
    Semaphore sm = new Semaphore(maxThreads, maxThreads); // maximum concurrent threads
    for (int i = 0; i < dt.Rows.Count; i++)
    {
        try
        {
            sm.WaitOne();

            Thread tr = new Thread(new ThreadStart(ProcessItems));
            tr.Name = Convert.ToString(dt.Rows[i]["Id"]);
            tr.IsBackground = true;

            tr.Start();
         }
         finally
         {
             sm.Release();
         }
    }


// You don't need the timer anymore
// Wait for the semaphore to be completely released

        for (int i=0; i<maxThreads ; i++)
            sm.WaitOne();

        sm.Release(maxThreads);


        if (Image_IDsToDelete != "")
        {
            RetailHelper.ExecuteNonQuery("Delete from images where Image_ID in (" + Image_IDsToDelete + ")");
        }

        if (ErrorText != "")
        {
            NotUploadedFiles = NotUploadedFiles + ".";
            XtraMessageBox.Show("Unable to connect to server. The following files were not uploaded:" + System.Environment.NewLine + NotUploadedFiles + ".", Global.Header, MessageBoxButtons.OK, MessageBoxIcon.Information);
        }


//The method which is used to upload images
private void ProcessItems()
{
    //if (dict.Count == 0)
    //    pthread.Suspend();
    //ArrayList toRemove = new ArrayList();
    try
    {
        sm.WaitOne(); 
        try
        {
            //int NoofAttempts = 0;
            //foreach (DictionaryEntry e in dict)
            //{
                //Thread.Sleep(500);
              dr = dt.Select("Is_Uploaded=0 And Id=" + Thread.CurrentThread.Name).FirstOrDefault();

                uxImageAndProgress pbCtl = panelControl1.Controls[dr["Image_ID"].ToString()] as uxImageAndProgress;
                //NoofAttempts = 0;
                string Path = "";
                if (ftpPath == "")
                {
                    Path = Global.FTPRemotePath + "/ProductImages/" + dr["Image_ID"] + dr["Extension"].ToString();
                }
                else
                {
                    Path = ftpPath + dr["Image_ID"] + dr["Extension"].ToString();
                }
                //object[] loader = e.Value as object[];
                int length = (int)(dr["ActualData"] as byte[]).Length;
                Stream stream = new MemoryStream(dr["ActualData"] as byte[]);
                byte[] rBuffer = ReadToEnd(stream);
                int d = length - (int)stream.Length;
                d = Math.Min(d, rnd.Next(10) + 1);

                if (ftpRequest == null)
                {
                    try
                    {
                        #region New Code
                        ftpRequest = (FtpWebRequest)FtpWebRequest.Create(new Uri(Path));
                        ftpRequest.Method = WebRequestMethods.Ftp.UploadFile;
                        ftpRequest.Credentials = new NetworkCredential(Global.FTPLogIn, Global.FTPPassword);
                        ftpRequest.UsePassive = true;
                        ftpRequest.UseBinary = true;
                        ftpRequest.KeepAlive = true;
                        ftpRequest.Timeout = 20000;
                        ftpRequest.ContentLength = length;
                        byte[] buffer = new byte[length > 4097 ? 4097 : length];
                        int bytes = 0;
                        int total_bytes = (int)length;
                        System.IO.Stream rs = ftpRequest.GetRequestStream();
                        while (total_bytes > 0)
                        {
                            bytes = stream.Read(buffer, 0, buffer.Length);
                            rs.Write(buffer, 0, bytes);
                            total_bytes = total_bytes - bytes;
                        }
                        dr["Is_Uploaded"] = 1;
                        dt.AcceptChanges();
                        ftpRequest = null;
                        pbCtl.Is_Uploaded = true;
                        rs.Close();
                        #endregion
                    }
                    catch (Exception eeex)
                    {
                        ftpRequest = null;
                        if (ErrorText == "")
                            ErrorText = eeex.Message.ToString();
                        else
                            ErrorText = ErrorText + "," + eeex.Message.ToString();

                        if (Image_IDsToDelete == "")
                            Image_IDsToDelete = dr["Image_ID"].ToString();
                        else
                            Image_IDsToDelete = Image_IDsToDelete + "," + dr["Image_ID"].ToString();

                        if (NotUploadedFiles == "")
                            NotUploadedFiles = Convert.ToString(dr["FileName"]);//dr["Image_ID"] + dr["Extension"].ToString();
                        else
                            NotUploadedFiles = NotUploadedFiles + ", " + Convert.ToString(dr["FileName"]); 

                        dr["Is_Uploaded"] = true;
                        dt.AcceptChanges();
                        ftpRequest = null;
                        pbCtl.Is_Uploaded = true;
                        pbCtl.Is_WithError = true;
                    }
                }
        }
        catch (Exception ex)
        {
            XtraMessageBox.Show(ex.Message.ToString(), Global.Header, MessageBoxButtons.OK);
            //pthread.Suspend();
        }
    }
    finally
    {
         sm.Release();
    }
}

Upvotes: 1

Pete S
Pete S

Reputation: 275

It sounds like a producer / consumer queue is the structure you are looking for. Take a look a this answer and the others in the thread for examples of how to employ it.

Upvotes: 0

Related Questions