Reputation: 11741
I have a step function that starts with many parallel steps (each parallel step is a lambda invocation) and then a finalize step that does some final processing.
It can be visualized here (a redacted version of the state function definition is given below as well). I know you can add try/catch logic around the parallel steps but if my understanding is correct that doesn't stop the other parallel steps from continuing and doesn't send them all to a different state.
Ideally what I'd like is if any of the parallel steps fail for any reason, all current steps would be cancelled (as well as future ones) and they would never go to the Finalize stage but instead to a third state (call it Error Recovery) for a different execution. Is this work flow possible? And if so, is it guaranteed that all the Parallel Steps would have been stopped before the Recovery state is entered?
Step Function Definition
{
"Comment": "An example of the Amazon States Language using a map state to process elements of an array with a max concurrency of 2.",
"StartAt": "Map",
"States": {
"Map": {
"Type": "Map",
"ItemsPath": "$.items",
"Parameters": {
...
},
"MaxConcurrency": 2,
"Next": "Finalize",
"Iterator": {
"StartAt": "Parallel Step",
"States": {
"Parallel Step": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "arn:aws:lambda:us-east-1:<>:function:lambda-parallel-step:$LATEST",
"Payload": {
"Input.$": "$"
}
},
"OutputPath": "$.Payload",
"End": true
}
}
}
},
"Finalize": {
"Type": "Pass",
"End": true
}}}
Upvotes: 7
Views: 6985
Reputation: 11741
Answer was simpler than I thought when I came back to this. You can put a catch on the entire Map
shown above. If anything in there has an uncaught exception you will move do whatever is indicated by that Catch
statement
Slightly modifying my input
{
"Comment": "Pipeline to read data from S3 and index into Elasticsearch",
"StartAt": "Map",
"States": {
"Map": {
"Type": "Map",
"ItemsPath": "$.items",
"Parameters": {
...
},
"ResultPath": "$.parallel-output",
"MaxConcurrency": 6,
"Next": "Finalize",
"Iterator": {
"StartAt": "Parallel",
"States": {
"Parallel": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "arn:aws:lambda:us-east-1:<>:function:parallel:$LATEST",
"Payload": {
"Input.$": "$"
}
},
"OutputPath": "$.Payload",
"End": true
}
}
},
"Catch": [ {"ErrorEquals": ["States.ALL"], "ResultPath": "$.error-info", "Next": "Cleanup State"}]
},
"Finalize": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "arn:aws:lambda:us-east-1:<>:function:finalize:$LATEST",
"Payload": {
"Input.$": "$"
}
},
"End": true
},
"Cleanup State": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "arn:aws:lambda:us-east-1:<>:function:cleanup:$LATEST",
"Payload": {
"Input.$": "$"
}
},
"Next": "Fail State"
},
"Fail State": {
"Type": "Fail",
"Error": "ErrorCode",
"Cause": "Caused By Message"
}
}
}
With this example it will do this for any failures but the documentation illustrates how you can do much more sophisticated error handling for different types of errors, retries, etc.
And the DAG looks like
Upvotes: 10