Tau
Tau

Reputation: 468

C# multithreading some threads seems to not run

I have an item processing application, and I want to process a number of items in parallel. But, it appears sometimes it doesn't process one item at all, and sometimes it processes more than once. My code below

On a timer tick, a new thread is created which prepares an item list from database, then locks every item so that it won't be picked in next loop.

SqlConnection connection = GetConnection();
string selectCommand = my_select_command_where_IsLocked_is_null;
List<Item> itemList = new List<Item>();
using(SqlDataAdapter adapter = new SqlDataAdapter(selectCommand, connection))
{
  using(SqlCommandBuilder cbCommandBuilder = new SqlCommandBuilder(adapter))
  {
    DataTable dtItemStore = new DataTable();
    adapter.Fill(dtItemStore);
    int totalRows = dtItemStore.Rows.Count;
    if(totalRows > 0)
    {
      adapter.UpdateCommand = cbCommandBuilder.GetUpdateCommand();
      foreach(DataRow row in dtItemStore.Rows)
      {
        Item pickedItem = new Item();
        pickedItem.Key = row["Key"].ToString();
        // Set all Item properties
        itemList.Add(pickedItem);
        row["IsLocked"] = 1;
      }
    }
    adapter.Update(dtItemStore);
  }
}

Then it iterates through the list, and fires a new thread for each element to process the item.

var keyList = "";
foreach(Item pickedItem in itemList)
{
  Thread.Sleep(15); // Just to delay a bit
  var newThread = new Thread(delegate() { ProcessItem(pickedItem); });
  newThread.Start();
  Logger.Log(" Spawn thread: " + pickedItem.Key + ", ThreadState: " + newThread.ThreadState);
  keyList = keyList + pickedItem.Key;
}
Logger.Log("Keys picked: " + keyList);

Item processor function unlocks the item at the end of the loop.

function ProcessItem(Item pickedItem)
{
  Logger.Log("Let's process !!! " + pickedItem.Key);
  // Item processing code

  UnlockItem(pickedItem); // unlock and write log
}

The problem is, after a while I realized there is too many locked items in my database, and the number is increasing. Logging at specific points I got something like below

- Spawn thread: 20779205, ThreadState: Running
- Let's process !!! 20779205
-  Spawn thread: 20779206, ThreadState: Running
- Let's process !!! 20779206
-  Spawn thread: 20779207, ThreadState: Running
- Let's process !!! 20779207
-  Spawn thread: 20779208, ThreadState: Running <---Not processed
- Let's process !!! 20779209                    <---Duplicate 
-  Spawn thread: 20779209, ThreadState: Running
- Let's process !!! 20779209
-  Spawn thread: 20779211, ThreadState: Running <---Not processed
- Let's process !!! 20779213
-  Spawn thread: 20779213, ThreadState: Running
- Let's process !!! 20779228
-  Spawn thread: 20779228, ThreadState: Running
- Let's process !!! 20779231                   
- Let's process !!! 20779231                    <---Duplicate
-  Spawn thread: 20779231, ThreadState: Running
-  Spawn thread: 20779237, ThreadState: Running
- Keys picked: 20779205, 20779206, 20779207, 20779208, 20779209, 20779211,   20779213, 20779228, 20779231, 20779237, 
- Let's process !!! 20779237
- STOP processing. Possible duplicate for 20779209
- STOP processing. Possible duplicate for 20779231
- Unlock Key: 20779209, Row count:1
- Unlock Key: 20779231, Row count:1
- Unlock Key: 20779209, Row count:1
- Unlock Key: 20779237, Row count:1
- Unlock Key: 20779228, Row count:1
- Unlock Key: 20779206, Row count:1
- Unlock Key: 20779207, Row count:1
- Unlock Key: 20779205, Row count:1
- Unlock Key: 20779231, Row count:1

Here I could see that sometimes the ProcessItem is not being hit (20779208, 20779211). I couldn't understand why this could be... after spending two days on this, I am totally lost.

Is this something typical in multi-threaded environment? How can I better handle this? I have to make sure that every Item is processed once and only once.

Also, I realized sometimes same Item is being processed more than once (20779209, 20779231), however, I handled it in the ProcessItem function to avoid duplication. Is there a link between those two scenarios ?

What am I doing wrong?

Upvotes: 5

Views: 172

Answers (3)

Andrei Bucurei
Andrei Bucurei

Reputation: 2458

Do you really need to spawn threads 'manually'? Leave this task to the framework:

itemList.AsParallel().ForAll((item) =>
{
    Logger.Log("Processing item ...");
    ProcessItem(item);
});

Now all you have to do is make sure the ProcessItem method is thread safe.

Edit

ForAll will block current thread until the action is completed for each item in the collection.

Upvotes: 0

ASh
ASh

Reputation: 35733

from output line Keys picked: 20779205, 20779206, 20779207, 20779208, 20779209, 20779211, 20779213, 20779228, 20779231, 20779237 i would conclude that all items were hit (one time each)

may be you should modify your code to:

var keyList = "";
foreach(Item pickedItem in itemList)
{
  Thread.Sleep(15); // Just to delay a bit
  Item tmpItem = pickedItem; -- the same variable should not be accessed by many threads
  var newThread = new Thread(delegate() { ProcessItem(tmpItem); });
  newThread.Start();
  Logger.Log(" Spawn thread: " + pickedItem.Key + ", ThreadState: " + newThread.ThreadState);
  keyList = keyList + pickedItem.Key;
}
Logger.Log("Keys picked: " + keyList);

Upvotes: 2

Bioukh
Bioukh

Reputation: 1968

Your log shows that pickedItem is assigned to the next value by the foreach loop before your delegate() { ProcessItem(pickedItem); } is executed.

You should use ParametrizedStart instead or add a synchronisation system between foreach loop and ProcessItem function.

Upvotes: 0

Related Questions