Lawrence Phillips
Lawrence Phillips

Reputation: 289

C# Parallel.For not executing every step

I have been working on a mock-up for an import service which currently runs in sequence. However my mock-up seems to exhibit a strange problem where by sometimes one or two items in the for loop aren't executed.

class Service
{
    private Thread _worker;
    private bool _stopping;        
    private CancellationTokenSource _cts;
    private ParallelOptions _po;
    private Repository _repository;

    public void Start(Repository repository)
    {
        _repository = repository;
        _cts = new CancellationTokenSource();            
        _po = new ParallelOptions { 
            CancellationToken = _cts.Token
        };

        _worker = new Thread(ProcessImport);
        _worker.Start();            
    }

    public void Stop()
    {
        _stopping = true;
        _cts.Cancel();
        if(_worker != null && _worker.IsAlive)
            _worker.Join();            
    }

    private void ProcessImport()
    {
        while (!_stopping)
        {
            var import = _repository.GetInProgressImport();
            if (import == null)
            {
                Thread.Sleep(1000);
                continue;
            }

            try
            {
                Parallel.For(0, 1000, _po, i => Work.DoWork(i, import, _cts.Token, _repository));
            }
            catch (OperationCanceledException)
            {
                // Unmark batch so it can be started again
                batch = _repository.GetBatch(import.BatchId);
                batch.Processing = false;
                _repository.UpdateBatch(batch);
                Console.WriteLine("Aborted import {0}", import.ImportId);
            }
            catch (Exception ex)
            {
                Console.WriteLine("Something went wrong: {0}", ex.Message);
            }         
        }         
    }
}

class Work
{
    public static void DoWork(int i, Import import, CancellationToken ct, Repository repository)
    {         
        // Simulate doing some work
        Thread.Sleep(100);
        HandleAbort(ct);
        Thread.Sleep(100);
        HandleAbort(ct);
        Thread.Sleep(100);

        // Update the batch
        var batch = repository.GetBatch(import.BatchId);
        batch.Processed++;
        if (batch.Processed == batch.Total)
        {
            batch.Finished = DateTime.Now;
            batch.Processing = false;                
        }            
        repository.UpdateBatch(batch);            
    }

    private static void HandleAbort(CancellationToken ct)
    {
        if (!ct.IsCancellationRequested) 
            return;
        ct.ThrowIfCancellationRequested();
    }
}

With this code, I often find that the batches are never complete and that batch.Processed = 999 or 998.

Can anyone shed any light on what I've done wrong.

Thanks in advance.

Edit:

To be clear about the repository/batch object - I believe in my current mock-up that it is threadsafe

class Repository
{
    private ConcurrentBag<Batch> _batchData = new ConcurrentBag<Batch>();
    private ConcurrentBag<Import> _importData = new ConcurrentBag<Import>();

    public void CreateImport(Import import)
    {
        _importData.Add(import);
    }

    public Import GetInProgressImport()
    {
        var import = _importData
            .Join(_batchData, i => i.BatchId, b => b.BatchId, (i, b) => new
            {
                Import = i,
                Batch = b
            })
            .Where(j => j.Batch.Processed < j.Batch.Total && !j.Batch.Processing)
            .OrderByDescending(j => j.Batch.Total - j.Batch.Processed)
            .ThenBy(j => j.Batch.BatchId - j.Batch.BatchId)
            .Select(j => j.Import)                
            .FirstOrDefault();

        if (import == null)
            return null;

        // mark the batch as processing
        var batch = GetBatch(import.BatchId);
        batch.Processing = true;
        UpdateBatch(batch);

        return import;
    }

    public List<Import> ListImports()
    {
        return _importData.ToList();
    }

    public void CreateBatch(Batch batch)
    {
        _batchData.Add(batch);
    }

    public Batch GetBatch(Int64 batchId)
    {
        return _batchData.FirstOrDefault(b => b.BatchId == batchId);
    }

    public void UpdateBatch(Batch batch)
    {
        var batchData = _batchData.First(b => b.BatchId == batch.BatchId);
        batchData.Total = batch.Total;
        batchData.Processed = batch.Processed;
        batchData.Started = batch.Started;
        batchData.Finished = batch.Finished;
        batchData.Processing = batch.Processing;
    }
}

class Import
{
    public Int64 ImportId { get; set; }
    public Int64 BatchId { get; set; }
}

class Batch
{
    public Int64 BatchId { get; set; }
    public int Total { get; set; }
    public int Processed { get; set; }
    public DateTime Created { get; set; }
    public DateTime Started { get; set; }
    public DateTime Finished { get; set; }   
    public bool Processing { get; set; }   
}

This is only a mock-up so there is no DB or other persistence behind my repository.

Also, I'm not competing my batch on the value of i, but rather the number of iterations of the loop (the work actually having been done) indicated by the Processed property of the batch object.

Thanks

Solution:

I had forgotten about the need synchronise the update of the batch. Should look like:

class Work
{
    private static object _sync = new object();

    public static void DoWork(int i, Import import, CancellationToken ct, Repository repository)
    {       
        // Do work            
        Thread.Sleep(100);
        HandleAbort(ct);
        Thread.Sleep(100);
        HandleAbort(ct);
        Thread.Sleep(100);

        lock (_sync)
        {
            // Update the batch
            var batch = repository.GetBatch(import.BatchId);
            batch.Processed++;
            if (batch.Processed == batch.Total)
            {
                batch.Finished = DateTime.Now;
                batch.Processing = false;
            }
            repository.UpdateBatch(batch);
        }
    }

    private static void HandleAbort(CancellationToken ct)
    {
        if (!ct.IsCancellationRequested) 
            return;
        ct.ThrowIfCancellationRequested();
    }
}

Upvotes: 1

Views: 330

Answers (2)

usr
usr

Reputation: 171246

Looks like lost updates on batch.Processed. Increments are not atomic. batch.Processed++; is racy. Use Interlocked.Increment.

It seems to me like you don't have a good understanding of threading right now. It's very dangerous to perform such elaborate threading without a good understanding. The mistakes you make are hard to test for but production will find them.

Upvotes: 3

David
David

Reputation: 10708

According to MSDN, the overloads of Parallel.For specify the second integer as toExclusive, meaning to goes up to but does not meet that value. In other words, 999 is the expected result, not 1000 - but note also that by starting at "0", your loop does execute 1,000 times.

From a glance, your code is parallel, so make sure you're not seeing the "999" call out of order from the "998" one - this is because by being executed in parallel, your code is inherently unordered, and can easily end up being very randomly rearranged. Also, read up on lock, as your code may be accessing values which it should be waiting for.

Upvotes: 0

Related Questions