Reputation: 3051
I have a data processing job that consists of about 20 sequential steps. The steps all fall under one of three categories:
I've refactored the code from one long, awful looking method to a pipeline pattern, using examples here and here. All of the steps are TransformBlock, such as
var stepThirteenPostToWebApi = new TransformBlock<FileInfo, System.Guid>(async csv =>
{
dynamic task = await ApiUtils.SubmitData(csv.FullName);
return task.guid;
});
The code works most of the time, but occasionally a step in the pipeline fails for whatever reason - let's say a corrupt file can't be read in step 6 of 20 (just an example - any step could fail). The pipeline stops running further tasks, as it should.
However, the 3rd party web API introduces a challenge - we are charged for each job we initiate whether we execute all 20 steps or just the first one.
I would like to be able to fix whatever went wrong in the problem step (again, for our example let's say I fix the corrupt file in step 6 of 20), then pick back up at step 6. The 3rd party web API has a GUID for each job, and is asynchronous, so that should be fine - after the problem is fixed, it will happily let a job resume with remaining steps.
My question: Is it possible (and if so advisable?) to design a pipeline that could begin at any step, assuming the pre-requisites for that step were valid?
It would look something like:
I realize a brute-force way would be to have StartAtStep2()
, StartAtStep3()
, StartAtStep4()
methods. That doesn't seem like a good design, but I'm a bit new at this pattern so maybe that's acceptable.
Upvotes: 3
Views: 394
Reputation: 127573
The brute force way is not that bad, for example your above code would just need to be
bool StartAtStepThirteen(FileInfo csv)
{
return stepThirteenPostToWebApi.Post(csv);
}
The setup of the chain should be a separate method than the executing of the chain. You should save stepThirteenPostToWebApi
in a class level variable in a class that represent's the entire chain, the setup of the chain could be done in the class's constructor.
Here is a simple 3 step version of the process. When a error happens instead of faulting the task chain I log the error and pass null
along the chain for invalid entries. You could make that log method raise a event and then the user can decide what to do with the bad entry.
public class WorkChain
{
private readonly TransformBlock<string, FileInfo> stepOneGetFileInfo;
private readonly TransformBlock<FileInfo, System.Guid?> stepTwoPostToWebApi;
private readonly ActionBlock<System.Guid?> stepThreeDisplayIdToUser;
public WorkChain()
{
stepOneGetFileInfo = new TransformBlock<string, FileInfo>(new Func<string, FileInfo>(GetFileInfo));
stepTwoPostToWebApi = new TransformBlock<FileInfo, System.Guid?>(new Func<FileInfo, Task<Guid?>>(PostToWebApi));
stepThreeDisplayIdToUser = new ActionBlock<System.Guid?>(new Action<Guid?>(DisplayIdToUser));
stepOneGetFileInfo.LinkTo(stepTwoPostToWebApi, new DataflowLinkOptions() {PropagateCompletion = true});
stepTwoPostToWebApi.LinkTo(stepThreeDisplayIdToUser, new DataflowLinkOptions() {PropagateCompletion = true});
}
public void PostToStepOne(string path)
{
bool result = stepOneGetFileInfo.Post(path);
if (!result)
{
throw new InvalidOperationException("Failed to post to stepOneGetFileInfo");
}
}
public void PostToStepTwo(FileInfo csv)
{
bool result = stepTwoPostToWebApi.Post(csv);
if (!result)
{
throw new InvalidOperationException("Failed to post to stepTwoPostToWebApi");
}
}
public void PostToStepThree(Guid id)
{
bool result = stepThreeDisplayIdToUser.Post(id);
if (!result)
{
throw new InvalidOperationException("Failed to post to stepThreeDisplayIdToUser");
}
}
public void CompleteAdding()
{
stepOneGetFileInfo.Complete();
}
public Task Completion { get { return stepThreeDisplayIdToUser.Completion; } }
private FileInfo GetFileInfo(string path)
{
try
{
return new FileInfo(path);
}
catch (Exception ex)
{
LogGetFileInfoError(ex, path);
return null;
}
}
private async Task<Guid?> PostToWebApi(FileInfo csv)
{
if (csv == null)
return null;
try
{
dynamic task = await ApiUtils.SubmitData(csv.FullName);
return task.guid;
}
catch (Exception ex)
{
LogPostToWebApiError(ex, csv);
return null;
}
}
private void DisplayIdToUser(Guid? obj)
{
if(obj == null)
return;
Console.WriteLine(obj.Value);
}
}
Upvotes: 4