Reputation: 468
I have an item processing application, and I want to process a number of items in parallel. But, it appears sometimes it doesn't process one item at all, and sometimes it processes more than once. My code below
On a timer tick, a new thread is created which prepares an item list from database, then locks every item so that it won't be picked in next loop.
SqlConnection connection = GetConnection();
string selectCommand = my_select_command_where_IsLocked_is_null;
List<Item> itemList = new List<Item>();
using(SqlDataAdapter adapter = new SqlDataAdapter(selectCommand, connection))
{
using(SqlCommandBuilder cbCommandBuilder = new SqlCommandBuilder(adapter))
{
DataTable dtItemStore = new DataTable();
adapter.Fill(dtItemStore);
int totalRows = dtItemStore.Rows.Count;
if(totalRows > 0)
{
adapter.UpdateCommand = cbCommandBuilder.GetUpdateCommand();
foreach(DataRow row in dtItemStore.Rows)
{
Item pickedItem = new Item();
pickedItem.Key = row["Key"].ToString();
// Set all Item properties
itemList.Add(pickedItem);
row["IsLocked"] = 1;
}
}
adapter.Update(dtItemStore);
}
}
Then it iterates through the list, and fires a new thread for each element to process the item.
var keyList = "";
foreach(Item pickedItem in itemList)
{
Thread.Sleep(15); // Just to delay a bit
var newThread = new Thread(delegate() { ProcessItem(pickedItem); });
newThread.Start();
Logger.Log(" Spawn thread: " + pickedItem.Key + ", ThreadState: " + newThread.ThreadState);
keyList = keyList + pickedItem.Key;
}
Logger.Log("Keys picked: " + keyList);
Item processor function unlocks the item at the end of the loop.
function ProcessItem(Item pickedItem)
{
Logger.Log("Let's process !!! " + pickedItem.Key);
// Item processing code
UnlockItem(pickedItem); // unlock and write log
}
The problem is, after a while I realized there is too many locked items in my database, and the number is increasing. Logging at specific points I got something like below
- Spawn thread: 20779205, ThreadState: Running
- Let's process !!! 20779205
- Spawn thread: 20779206, ThreadState: Running
- Let's process !!! 20779206
- Spawn thread: 20779207, ThreadState: Running
- Let's process !!! 20779207
- Spawn thread: 20779208, ThreadState: Running <---Not processed
- Let's process !!! 20779209 <---Duplicate
- Spawn thread: 20779209, ThreadState: Running
- Let's process !!! 20779209
- Spawn thread: 20779211, ThreadState: Running <---Not processed
- Let's process !!! 20779213
- Spawn thread: 20779213, ThreadState: Running
- Let's process !!! 20779228
- Spawn thread: 20779228, ThreadState: Running
- Let's process !!! 20779231
- Let's process !!! 20779231 <---Duplicate
- Spawn thread: 20779231, ThreadState: Running
- Spawn thread: 20779237, ThreadState: Running
- Keys picked: 20779205, 20779206, 20779207, 20779208, 20779209, 20779211, 20779213, 20779228, 20779231, 20779237,
- Let's process !!! 20779237
- STOP processing. Possible duplicate for 20779209
- STOP processing. Possible duplicate for 20779231
- Unlock Key: 20779209, Row count:1
- Unlock Key: 20779231, Row count:1
- Unlock Key: 20779209, Row count:1
- Unlock Key: 20779237, Row count:1
- Unlock Key: 20779228, Row count:1
- Unlock Key: 20779206, Row count:1
- Unlock Key: 20779207, Row count:1
- Unlock Key: 20779205, Row count:1
- Unlock Key: 20779231, Row count:1
Here I could see that sometimes the ProcessItem
is not being hit (20779208
, 20779211
). I couldn't understand why this could be... after spending two days on this, I am totally lost.
Is this something typical in multi-threaded environment? How can I better handle this? I have to make sure that every Item is processed once and only once.
Also, I realized sometimes same Item is being processed more than once (20779209
, 20779231
), however, I handled it in the ProcessItem
function to avoid duplication. Is there a link between those two scenarios ?
What am I doing wrong?
Upvotes: 5
Views: 172
Reputation: 2458
Do you really need to spawn threads 'manually'? Leave this task to the framework:
itemList.AsParallel().ForAll((item) =>
{
Logger.Log("Processing item ...");
ProcessItem(item);
});
Now all you have to do is make sure the ProcessItem
method is thread safe.
ForAll
will block current thread until the action is completed for each item in the collection.
Upvotes: 0
Reputation: 35733
from output line Keys picked: 20779205, 20779206, 20779207, 20779208, 20779209, 20779211, 20779213, 20779228, 20779231, 20779237
i would conclude that all items were hit (one time each)
may be you should modify your code to:
var keyList = "";
foreach(Item pickedItem in itemList)
{
Thread.Sleep(15); // Just to delay a bit
Item tmpItem = pickedItem; -- the same variable should not be accessed by many threads
var newThread = new Thread(delegate() { ProcessItem(tmpItem); });
newThread.Start();
Logger.Log(" Spawn thread: " + pickedItem.Key + ", ThreadState: " + newThread.ThreadState);
keyList = keyList + pickedItem.Key;
}
Logger.Log("Keys picked: " + keyList);
Upvotes: 2
Reputation: 1968
Your log shows that pickedItem
is assigned to the next value by the foreach
loop before your delegate() { ProcessItem(pickedItem); }
is executed.
You should use ParametrizedStart instead or add a synchronisation system between foreach
loop and ProcessItem
function.
Upvotes: 0