codek
codek

Reputation: 65

AWS Step Function - Dynamic parallelism MaxConcurrency Field

We are using Step functions dynamic parallelism with Map state to achieve concurrency. Is it possible to pass the values to "MaxConcurrency" field from its upstream task(lambda or read from file ) in map state stepfunctions .

Current code:

"Type": "Map",
"InputPath": "$.detail",
"ItemsPath": "$.shipped",
"MaxConcurrency": 3,
"ResultPath": "$.detail.shipped",

Expectation(pass input to MaxConcurrency from lambda task or read file task ):

 "Type": "Map",
 "InputPath": "$.detail",
 "ItemsPath": "$.shipped",
 "MaxConcurrency": "$.input",
 "ResultPath": "$.detail.shipped"

Getting error as it supports only integer.

Upvotes: 2

Views: 3202

Answers (2)

Brian
Brian

Reputation: 242

Not sure when this parameter was added but the Map step has an optional MaxConcurrencyPath parameter. I was able to get it to work in my stack with something like this

JoinLoop:
    Type: Map
    End: True
    MaxConcurrencyPath: $.Concurrency

where concurrency is set in the previous step.

https://docs.aws.amazon.com/step-functions/latest/dg/use-dist-map-orchestrate-large-scale-parallel-workloads.html

Upvotes: 0

fedonev
fedonev

Reputation: 25799

You can set maxConcurrency at state machine definition-time, but not at execution-time. As you experienced, Map's maxConcurrency expects a number, but the state machine language uses strings to pass variables dynamically.

(Note: Step Functions are concurrent by default. Docs: maxConcurrency's "default value is 0, which places no quota on parallelism and iterations are invoked as concurrently as possible".)

Option 1: Choice + Map (Difficulty: Low)

A workaround that limits concurrency dynamically at execution-time is a Choice state that branches to discrete map states based on an input variable. Each branch's Map has a different maxConcurrency, but is otherwise identical. Add whichever discrete maxConcurrency choices you need. Choice also accepts a default case to catch unmatched choice input.

// execution input
{
  "concurrency": 5,
  "jobs": [ { "jobId": 1 }, { "jobId": 2 }, { "jobId": 3 }, { "jobId": 4}]
}
// state machine definition (partial)
"States": {
  "Max-Concurrency-Choice": {
    "Type": "Choice",
    "Choices": [
      {
        "Variable": "$.concurrency",
        "NumericEquals": 5,
        "Next": "MapState-MaxConcurrency-5"  // maxConcurrency in this branch is set at 5
      },
      {
        "Variable": "$.concurrency",
        "NumericEquals": 10,
        "Next": "MapState-MaxConcurrency-10" // maxConcurrency in this branch is set at 10
      }
    ],
    "Default": "MapState-MaxConcurrency-1" // maxConcurrency in the default branch is set at 1
  },

choice state machine screenshot

Option 2: Nested Sfn + API Call (Difficulty: High)

Nest your Sfn in a new Sfn. The new, parent Sfn takes a maxConcurrency in the input. It has two tasks:

  1. In a Lambda Task, call the UpdateStateMachine API with new stringified JSON state machine definition for your current, child Sfn.
  2. Invoke your current State Machine. The Sfn will have the new maxConurrency.

Upvotes: 1

Related Questions