Gojira
Gojira

Reputation: 3051

C# TPL: Possible to restart a failed Pipeline at an arbitrary step?

I have a data processing job that consists of about 20 sequential steps. The steps all fall under one of three categories:

  1. do some file manipulation
  2. import / export data from a database
  3. make a call to a 3rd party web API

I've refactored the code from one long, awful looking method to a pipeline pattern, using examples here and here. All of the steps are TransformBlock, such as

var stepThirteenPostToWebApi = new TransformBlock<FileInfo, System.Guid>(async csv =>
{
dynamic task = await ApiUtils.SubmitData(csv.FullName);
return task.guid;
});

The code works most of the time, but occasionally a step in the pipeline fails for whatever reason - let's say a corrupt file can't be read in step 6 of 20 (just an example - any step could fail). The pipeline stops running further tasks, as it should.

However, the 3rd party web API introduces a challenge - we are charged for each job we initiate whether we execute all 20 steps or just the first one.

I would like to be able to fix whatever went wrong in the problem step (again, for our example let's say I fix the corrupt file in step 6 of 20), then pick back up at step 6. The 3rd party web API has a GUID for each job, and is asynchronous, so that should be fine - after the problem is fixed, it will happily let a job resume with remaining steps.

My question: Is it possible (and if so advisable?) to design a pipeline that could begin at any step, assuming the pre-requisites for that step were valid?

It would look something like:

  1. job fails on step 6 and logs step 5 as the last successful step
  2. a human comes along and fixes whatever caused step 6 to fail
  3. a new pipeline is started at step 6

I realize a brute-force way would be to have StartAtStep2(), StartAtStep3(), StartAtStep4() methods. That doesn't seem like a good design, but I'm a bit new at this pattern so maybe that's acceptable.

Upvotes: 3

Views: 394

Answers (1)

Scott Chamberlain
Scott Chamberlain

Reputation: 127573

The brute force way is not that bad, for example your above code would just need to be

bool StartAtStepThirteen(FileInfo csv) 
{ 
    return stepThirteenPostToWebApi.Post(csv); 
}

The setup of the chain should be a separate method than the executing of the chain. You should save stepThirteenPostToWebApi in a class level variable in a class that represent's the entire chain, the setup of the chain could be done in the class's constructor.

Here is a simple 3 step version of the process. When a error happens instead of faulting the task chain I log the error and pass null along the chain for invalid entries. You could make that log method raise a event and then the user can decide what to do with the bad entry.

public class WorkChain
{
    private readonly TransformBlock<string, FileInfo> stepOneGetFileInfo;
    private readonly TransformBlock<FileInfo, System.Guid?> stepTwoPostToWebApi;
    private readonly ActionBlock<System.Guid?> stepThreeDisplayIdToUser;

    public WorkChain()
    {
        stepOneGetFileInfo = new TransformBlock<string, FileInfo>(new Func<string, FileInfo>(GetFileInfo));
        stepTwoPostToWebApi = new TransformBlock<FileInfo, System.Guid?>(new Func<FileInfo, Task<Guid?>>(PostToWebApi));
        stepThreeDisplayIdToUser = new ActionBlock<System.Guid?>(new Action<Guid?>(DisplayIdToUser));

        stepOneGetFileInfo.LinkTo(stepTwoPostToWebApi, new DataflowLinkOptions() {PropagateCompletion = true});
        stepTwoPostToWebApi.LinkTo(stepThreeDisplayIdToUser, new DataflowLinkOptions() {PropagateCompletion = true});
    }

    public void PostToStepOne(string path)
    {
        bool result = stepOneGetFileInfo.Post(path);
        if (!result)
        {
            throw new InvalidOperationException("Failed to post to stepOneGetFileInfo");
        }
    }

    public void PostToStepTwo(FileInfo csv)
    {
        bool result = stepTwoPostToWebApi.Post(csv);
        if (!result)
        {
            throw new InvalidOperationException("Failed to post to stepTwoPostToWebApi");
        }
    }

    public void PostToStepThree(Guid id)
    {
        bool result = stepThreeDisplayIdToUser.Post(id);
        if (!result)
        {
            throw new InvalidOperationException("Failed to post to stepThreeDisplayIdToUser");
        }
    }

    public void CompleteAdding()
    {
        stepOneGetFileInfo.Complete();
    }

    public Task Completion { get { return stepThreeDisplayIdToUser.Completion; } }


    private FileInfo GetFileInfo(string path)
    {
        try
        {
            return new FileInfo(path);
        }
        catch (Exception ex)
        {
            LogGetFileInfoError(ex, path);
            return null;
        }

    }

    private async Task<Guid?> PostToWebApi(FileInfo csv)
    {
        if (csv == null)
            return null;
        try
        {
            dynamic task = await ApiUtils.SubmitData(csv.FullName);
            return task.guid;
        }
        catch (Exception ex)
        {
            LogPostToWebApiError(ex, csv);
            return null;
        }
    }

    private void DisplayIdToUser(Guid? obj)
    {
        if(obj == null)
            return;

        Console.WriteLine(obj.Value);
    }

}

Upvotes: 4

Related Questions