Reputation: 23
I am using threads to upload images on a FTP. Now I have a problem in limiting the number of threads. when I am creating same number of threads equal to images then it's fine i.e. it is working fine. But now I want to create only suppose maximum of 5 number of threads to upload 100 or more images. I have a datatable in which these 100 images are with a unique field ID which stores suppose 0,1,2,3....and so on for every images. Now I want to start only five threads once so that it may start uploading 5 images parallely. On a Timer, I am checking the status of threads and if I found a thread which is not live now, I want to assign it the 6th Image for uploading and in the same way, if I found other thread which finished its uploading/work, I want to give it 7th image to upload and so on. i.e. this process will run until 100 images are uploaded. Can you please suggest me a structure by using which I may achieve this? Currently I am creating 100 threads for 100 images and it is working perfect. But I am afraid of creating that much number of threads. Will that affect performance?
My Current Code is:
// A page level variable
Thread [] tr=null;
//On Load of the Control
tr = new Thread[dt.Rows.Count];
//tr = new Thread[MaxID];
for (int i = 0; i < dt.Rows.Count; i++)
//for (int i = 0; i < MaxID; i++)
{
tr[i] = new Thread(new ThreadStart(ProcessItems));
tr[i].Name = Convert.ToString(dt.Rows[i]["Id"]);
tr[i].IsBackground = true;
}
//Start each thread
foreach (Thread x in tr)
{
x.Start();
}
//The method which is used to upload images
public object tLock = new object();
private void ProcessItems()
{
//if (dict.Count == 0)
// pthread.Suspend();
//ArrayList toRemove = new ArrayList();
lock (tLock)
{
try
{
//int NoofAttempts = 0;
//foreach (DictionaryEntry e in dict)
//{
//Thread.Sleep(500);
dr = dt.Select("Is_Uploaded=0 And Id=" + Thread.CurrentThread.Name).FirstOrDefault();
uxImageAndProgress pbCtl = panelControl1.Controls[dr["Image_ID"].ToString()] as uxImageAndProgress;
//NoofAttempts = 0;
string Path = "";
if (ftpPath == "")
{
Path = Global.FTPRemotePath + "/ProductImages/" + dr["Image_ID"] + dr["Extension"].ToString();
}
else
{
Path = ftpPath + dr["Image_ID"] + dr["Extension"].ToString();
}
//object[] loader = e.Value as object[];
int length = (int)(dr["ActualData"] as byte[]).Length;
Stream stream = new MemoryStream(dr["ActualData"] as byte[]);
byte[] rBuffer = ReadToEnd(stream);
int d = length - (int)stream.Length;
d = Math.Min(d, rnd.Next(10) + 1);
if (ftpRequest == null)
{
try
{
#region New Code
ftpRequest = (FtpWebRequest)FtpWebRequest.Create(new Uri(Path));
ftpRequest.Method = WebRequestMethods.Ftp.UploadFile;
ftpRequest.Credentials = new NetworkCredential(Global.FTPLogIn, Global.FTPPassword);
ftpRequest.UsePassive = true;
ftpRequest.UseBinary = true;
ftpRequest.KeepAlive = true;
ftpRequest.Timeout = 20000;
ftpRequest.ContentLength = length;
byte[] buffer = new byte[length > 4097 ? 4097 : length];
int bytes = 0;
int total_bytes = (int)length;
System.IO.Stream rs = ftpRequest.GetRequestStream();
while (total_bytes > 0)
{
bytes = stream.Read(buffer, 0, buffer.Length);
rs.Write(buffer, 0, bytes);
total_bytes = total_bytes - bytes;
}
dr["Is_Uploaded"] = 1;
dt.AcceptChanges();
ftpRequest = null;
pbCtl.Is_Uploaded = true;
rs.Close();
#endregion
}
catch (Exception eeex)
{
ftpRequest = null;
if (ErrorText == "")
ErrorText = eeex.Message.ToString();
else
ErrorText = ErrorText + "," + eeex.Message.ToString();
if (Image_IDsToDelete == "")
Image_IDsToDelete = dr["Image_ID"].ToString();
else
Image_IDsToDelete = Image_IDsToDelete + "," + dr["Image_ID"].ToString();
if (NotUploadedFiles == "")
NotUploadedFiles = Convert.ToString(dr["FileName"]);//dr["Image_ID"] + dr["Extension"].ToString();
else
NotUploadedFiles = NotUploadedFiles + ", " + Convert.ToString(dr["FileName"]);
dr["Is_Uploaded"] = true;
dt.AcceptChanges();
ftpRequest = null;
pbCtl.Is_Uploaded = true;
pbCtl.Is_WithError = true;
}
}
}
catch (Exception ex)
{
XtraMessageBox.Show(ex.Message.ToString(), Global.Header, MessageBoxButtons.OK);
//pthread.Suspend();
}
}
}
//The Timer Event on which I am checking the Status of threads and taking appropriate action
private void timer1_Tick(object sender, EventArgs e)
{
bool Is_AllFinished=true;
//Start each thread
foreach (Thread x in tr)
{
if (x.IsAlive == true)
{
Is_AllFinished = false;
break;
}
else
{
//DataRow[] drs = dt.Select("Is_Uploaded=0");
//if (drs.Count() > 0)
//{
//x. = Convert.ToString(MaxID + 1);
//x.Start();
//MaxID = MaxID + 1;
//}
}
}
if (Is_AllFinished == true)
{
timer1.Enabled = false;
if (Image_IDsToDelete != "")
{
RetailHelper.ExecuteNonQuery("Delete from images where Image_ID in (" + Image_IDsToDelete + ")");
}
if (ErrorText != "")
{
NotUploadedFiles = NotUploadedFiles + ".";
XtraMessageBox.Show("Unable to connect to server. The following files were not uploaded:" + System.Environment.NewLine + NotUploadedFiles + ".", Global.Header, MessageBoxButtons.OK, MessageBoxIcon.Information);
}
Is_Done = true;
}
}
Now, I want to convert this code to use a fixed number of threads. Please help me. Thanking you!
Upvotes: 0
Views: 108
Reputation: 5056
Use a Semaphore it is good enough. You can polish the code yourself.
const int maxThreads = 5;
Semaphore sm = new Semaphore(maxThreads, maxThreads); // maximum concurrent threads
for (int i = 0; i < dt.Rows.Count; i++)
{
try
{
sm.WaitOne();
Thread tr = new Thread(new ThreadStart(ProcessItems));
tr.Name = Convert.ToString(dt.Rows[i]["Id"]);
tr.IsBackground = true;
tr.Start();
}
finally
{
sm.Release();
}
}
// You don't need the timer anymore
// Wait for the semaphore to be completely released
for (int i=0; i<maxThreads ; i++)
sm.WaitOne();
sm.Release(maxThreads);
if (Image_IDsToDelete != "")
{
RetailHelper.ExecuteNonQuery("Delete from images where Image_ID in (" + Image_IDsToDelete + ")");
}
if (ErrorText != "")
{
NotUploadedFiles = NotUploadedFiles + ".";
XtraMessageBox.Show("Unable to connect to server. The following files were not uploaded:" + System.Environment.NewLine + NotUploadedFiles + ".", Global.Header, MessageBoxButtons.OK, MessageBoxIcon.Information);
}
//The method which is used to upload images
private void ProcessItems()
{
//if (dict.Count == 0)
// pthread.Suspend();
//ArrayList toRemove = new ArrayList();
try
{
sm.WaitOne();
try
{
//int NoofAttempts = 0;
//foreach (DictionaryEntry e in dict)
//{
//Thread.Sleep(500);
dr = dt.Select("Is_Uploaded=0 And Id=" + Thread.CurrentThread.Name).FirstOrDefault();
uxImageAndProgress pbCtl = panelControl1.Controls[dr["Image_ID"].ToString()] as uxImageAndProgress;
//NoofAttempts = 0;
string Path = "";
if (ftpPath == "")
{
Path = Global.FTPRemotePath + "/ProductImages/" + dr["Image_ID"] + dr["Extension"].ToString();
}
else
{
Path = ftpPath + dr["Image_ID"] + dr["Extension"].ToString();
}
//object[] loader = e.Value as object[];
int length = (int)(dr["ActualData"] as byte[]).Length;
Stream stream = new MemoryStream(dr["ActualData"] as byte[]);
byte[] rBuffer = ReadToEnd(stream);
int d = length - (int)stream.Length;
d = Math.Min(d, rnd.Next(10) + 1);
if (ftpRequest == null)
{
try
{
#region New Code
ftpRequest = (FtpWebRequest)FtpWebRequest.Create(new Uri(Path));
ftpRequest.Method = WebRequestMethods.Ftp.UploadFile;
ftpRequest.Credentials = new NetworkCredential(Global.FTPLogIn, Global.FTPPassword);
ftpRequest.UsePassive = true;
ftpRequest.UseBinary = true;
ftpRequest.KeepAlive = true;
ftpRequest.Timeout = 20000;
ftpRequest.ContentLength = length;
byte[] buffer = new byte[length > 4097 ? 4097 : length];
int bytes = 0;
int total_bytes = (int)length;
System.IO.Stream rs = ftpRequest.GetRequestStream();
while (total_bytes > 0)
{
bytes = stream.Read(buffer, 0, buffer.Length);
rs.Write(buffer, 0, bytes);
total_bytes = total_bytes - bytes;
}
dr["Is_Uploaded"] = 1;
dt.AcceptChanges();
ftpRequest = null;
pbCtl.Is_Uploaded = true;
rs.Close();
#endregion
}
catch (Exception eeex)
{
ftpRequest = null;
if (ErrorText == "")
ErrorText = eeex.Message.ToString();
else
ErrorText = ErrorText + "," + eeex.Message.ToString();
if (Image_IDsToDelete == "")
Image_IDsToDelete = dr["Image_ID"].ToString();
else
Image_IDsToDelete = Image_IDsToDelete + "," + dr["Image_ID"].ToString();
if (NotUploadedFiles == "")
NotUploadedFiles = Convert.ToString(dr["FileName"]);//dr["Image_ID"] + dr["Extension"].ToString();
else
NotUploadedFiles = NotUploadedFiles + ", " + Convert.ToString(dr["FileName"]);
dr["Is_Uploaded"] = true;
dt.AcceptChanges();
ftpRequest = null;
pbCtl.Is_Uploaded = true;
pbCtl.Is_WithError = true;
}
}
}
catch (Exception ex)
{
XtraMessageBox.Show(ex.Message.ToString(), Global.Header, MessageBoxButtons.OK);
//pthread.Suspend();
}
}
finally
{
sm.Release();
}
}
Upvotes: 1
Reputation: 275
It sounds like a producer / consumer queue is the structure you are looking for. Take a look a this answer and the others in the thread for examples of how to employ it.
Upvotes: 0