Reputation: 65
We are using Step functions dynamic parallelism with Map state to achieve concurrency. Is it possible to pass the values to "MaxConcurrency" field from its upstream task(lambda or read from file ) in map state stepfunctions .
Current code:
"Type": "Map",
"InputPath": "$.detail",
"ItemsPath": "$.shipped",
"MaxConcurrency": 3,
"ResultPath": "$.detail.shipped",
Expectation(pass input to MaxConcurrency from lambda task or read file task ):
"Type": "Map",
"InputPath": "$.detail",
"ItemsPath": "$.shipped",
"MaxConcurrency": "$.input",
"ResultPath": "$.detail.shipped"
Getting error as it supports only integer.
Upvotes: 2
Views: 3202
Reputation: 242
Not sure when this parameter was added but the Map step has an optional MaxConcurrencyPath
parameter. I was able to get it to work in my stack with something like this
JoinLoop:
Type: Map
End: True
MaxConcurrencyPath: $.Concurrency
where concurrency is set in the previous step.
Upvotes: 0
Reputation: 25799
You can set maxConcurrency
at state machine definition-time, but not at execution-time. As you experienced, Map's maxConcurrency
expects a number
, but the state machine language uses strings
to pass variables dynamically.
(Note: Step Functions are concurrent by default. Docs: maxConcurrency
's "default value is 0, which places no quota on parallelism and iterations are invoked as concurrently as possible".)
A workaround that limits concurrency dynamically at execution-time is a Choice state that branches to discrete map states based on an input variable. Each branch's Map has a different maxConcurrency
, but is otherwise identical. Add whichever discrete maxConcurrency
choices you need. Choice also accepts a default case to catch unmatched choice input.
// execution input
{
"concurrency": 5,
"jobs": [ { "jobId": 1 }, { "jobId": 2 }, { "jobId": 3 }, { "jobId": 4}]
}
// state machine definition (partial)
"States": {
"Max-Concurrency-Choice": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.concurrency",
"NumericEquals": 5,
"Next": "MapState-MaxConcurrency-5" // maxConcurrency in this branch is set at 5
},
{
"Variable": "$.concurrency",
"NumericEquals": 10,
"Next": "MapState-MaxConcurrency-10" // maxConcurrency in this branch is set at 10
}
],
"Default": "MapState-MaxConcurrency-1" // maxConcurrency in the default branch is set at 1
},
Nest your Sfn in a new Sfn. The new, parent Sfn takes a maxConcurrency
in the input. It has two tasks:
maxConurrency
.Upvotes: 1