Reputation: 560
I have a state-machine consisting of a first pre-process task that generates an array as output, which is used by a subsequent map state to loop over. The output array of the first task has gotten too big and the state-machine throws the error States.DataLimitExceeded
: The state/task 'arn:aws:lambda:XYZ' returned a result with a size exceeding the maximum number of characters service limit.
Here is an example of the state-machine yaml:
stateMachines:
myStateMachine:
name: "myStateMachine"
definition:
StartAt: preProcess
States:
preProcess:
Type: Task
Resource:
Fn::GetAtt: [preProcessLambda, Arn]
Next: mapState
ResultPath: "$.preProcessOutput"
mapState:
Type: Map
ItemsPath: "$.preProcessOutput.data"
MaxConcurrency: 100
Iterator:
StartAt: doMap
States:
doMap:
Type: Task
Resource:
Fn::GetAtt: [doMapLambda, Arn]
End: true
Next: ### next steps, not relevant
A possible solution I came up with would be that state preProcess
saves its output in an S3-bucket and state mapState
reads directly from it. Is this possible? At the moment the output of preProcess
is
ResultPath: "$.preProcessOutput"
and mapState
takes the array
ItemsPath: "$.preProcessOutput.data"
as input.
How would I need to adapt the yaml that the map state reads directly from S3?
Upvotes: 7
Views: 11593
Reputation: 619
There is now a Map State in Distributed Mode
:
https://docs.aws.amazon.com/step-functions/latest/dg/concepts-asl-use-map-state-distributed.html
Use the Map state in Distributed mode when you need to orchestrate large-scale parallel workloads that meet any combination of the following conditions:
- The size of your dataset exceeds 256 KB.
- The workflow's execution event history exceeds 25,000 entries.
- You need a concurrency of more than 40 parallel iterations.
Upvotes: 1
Reputation: 5049
The proposed workarounds work for specific scenarios, but it is not in the one that the processing of a normal payload can generate a big list of items that can exceed the payload limit.
In a general form I think that the problem can repeat in the scenarios 1->N. I mean when one step might generate many step executions in the workflow.
One of the clear ways to break the complexity of some task is divide it into many others, so this is likely to be needed a lot of times. Also from the scalability perspective, there is a clear advantage in doing that, because the more you break the big computations into little ones there is more granularity and more parallelism and optimizations can be done.
That is what AWS intends to facilitate by increasing the max payload size. They call it dynamic parallelism.
The problem is that the Map state is the corner-stone of that. Beside the service integrations (database queries, etc.) is the only one that can dynamically derive many tasks from just one step. But there seems to be no way to specify to it that the payload is on a file.
I see a quick solution to the problem would be if they add one optional persistence spec to the each step, for example:
stateMachines:
myStateMachine:
name: "myStateMachine"
definition:
StartAt: preProcess
States:
preProcess:
Type: Task
Resource:
Fn::GetAtt: [preProcessLambda, Arn]
Next: mapState
ResultPath: "$.preProcessOutput"
OutputFormat:
S3:
Bucket: myBucket
Compression:
Format: gzip
mapState:
Type: Map
ItemsPath: "$.preProcessOutput.data"
InputFormat:
S3:
Bucket: myBucket
Compression:
Format: gzip
MaxConcurrency: 100
Iterator:
StartAt: doMap
States:
doMap:
Type: Task
Resource:
Fn::GetAtt: [doMapLambda, Arn]
End: true
Next: ### next steps, not relevant
That way the Map could perform its work even over large payloads.
Upvotes: 0
Reputation: 63
Just writing this in case someone else comes across the issue - I recently had to solve this at work as well. I found what I thought to be a relatively simple solution, without the use of a second step function.
I'm using Python for this and will provide a few examples in Python, but the solution should be applicable to any language.
Assuming the pre-process output looks like so:
[
{Output_1},
{Output_2},
.
.
.
{Output_n}
]
And a simplified version of the section of the Step Function is defined as follows:
"PreProcess": {
"Type": "Task",
"Resource": "Your Resource ARN",
"Next": "Map State"
},
"Map State": {
Do a bunch of stuff
}
To handle the scenario where the PreProcess output exceeds the Step Functions payload:
Inside the PreProcess, batch the output into chunks small enough to not exceed the payload.
This is the most complicated step. You will need to do some experimenting to find the largest size of a single batch. Once you have the number (it may be smart to make this number dynamic), I used numpy to split the original PreProcess output into the number of batches.
import numpy as np
batches = np.array_split(original_pre_process_output, number_of_batches)
Again inside the PreProcess, upload each batch to Amazon S3, saving the keys in a new list. This list of S3 keys will be the new PreProcess output.
In Python, this looks like so:
import json
import boto3
s3 = boto3.resource('s3')
batch_keys = []
for batch in batches:
s3_batch_key = 'Your S3 Key here'
s3.Bucket(YOUR_BUCKET).put_object(Key=s3_batch_key, Body=json.dumps(batch))
batch_keys.append({'batch_key': s3_batch_key})
In the solution I implemented, I used for batch_id, batch in enumerate(batches)
to easily give each S3 key its own ID.
Wrap the 'Inner' Map State in an 'Outer' Map State, and create a Lambda function within the Outer Map to feed the batches to the Inner Map.
Now that we have a small output consisting of S3 keys, we need a way to open one at a time, feeding each batch into the original (now 'Inner') Map state.
To do this, first create a new Lambda function - this will represent the BatchJobs
state. Next, wrap the initial Map state inside an Outer map, like so:
"PreProcess": {
"Type": "Task",
"Resource": "Your Resource ARN",
"Next": "Outer Map"
},
"Outer Map": {
"Type": "Map",
"MaxConcurrency": 1,
"Next": "Original 'Next' used in the Inner map",
"Iterator": {
"StartAt": "BatchJobs",
"States": {
"BatchJobs": {
"Type": "Task",
"Resource": "Newly created Lambda Function ARN",
"Next": "Inner Map"
},
"Inner Map": {
Initial Map State, left as is.
}
}
}
}
Note the 'MaxConcurrency' parameter in the Outer Map - This simply ensures the batches are executed sequentially.
With this new Step Function definition, the BatchJobs
state will receive {'batch_key': s3_batch_key}
, for each batch. The BatchJobs
state then simply needs to get the object stored in the key, and pass it to the Inner Map.
In Python, the BatchJobs
Lambda function looks like so:
import json
import boto3
s3 = boto3.client('s3')
def batch_jobs_handler(event, context):
return json.loads(s3.get_object(Bucket='YOUR_BUCKET_HERE',
Key=event.get('batch_key'))['Body'].read().decode('utf-8'))
Update your workflow to handle the new structure of the output.
Before implementing this solution, your Map state outputs an array of outputs:
[
{Map_output_1},
{Map_output_2},
.
.
.
{Map_output_n}
]
With this solution, you will now get a list of lists, with each inner list containing the results of each batch:
[
[
{Batch_1_output_1},
{Batch_1_output_2},
.
.
.
{Batch_1_output_n}
],
[
{Batch_2_output_1},
{Batch_2_output_2},
.
.
.
{Batch_2_output_n}
],
.
.
.
[
{Batch_n_output_1},
{Batch_n_output_2},
.
.
.
{Batch_n_output_n}
]
]
Depending on your needs, you may need to adjust some code after the Map in order to handle the new format of the output.
That's it! As long as you set the max batch size correctly, the only way you will hit a payload limit is if your list of S3 keys exceeds the payload limit.
Upvotes: 1
Reputation: 3648
As of September 2020 the limit on step functions has been increased 8-fold
Maybe now it fits within your requirements
Upvotes: 1
Reputation: 8137
I am solving a similar problem at work currently too. Because a step function stores its entire state, you can pretty quickly have problems as your json grows as it maps over all the values.
The only real way to solve this is to use hierarchies of step functions. That is, step functions on your step functions. So you have:
parent -> [batch1, batch2, batch...N]
And then each batch have a number of single jobs:
batch1 -> [j1,j2,j3...jBATCHSIZE]
I had a pretty simple step function, and I found at ~4k
was about the max batch size I could have before I would start hitting state limits.
Not a pretty solution be hey it works.
Upvotes: 3
Reputation: 46
I don't think it is possible to read directly from S3 at this time. There are a few things you could try to do to get around this limitation. One is making your own iterator and not using Map State. Another is the following:
Have a lambda read your s3 file and chunk it by index or some id/key. The idea behind this step is to pass the iterator in Map State a WAY smaller payload. Say your data has the below structure.
[ { idx: 1, ...more keys }, {idx: 2, ...more keys }, { idx: 3, ...more keys }, ... 4,997 more objects of data ]
Say you want your iterator to process 1,000 rows at a time. Return the following tuples representing indexs from your lambda instead: [ [ 0, 999 ], [ 1000, 1999 ], [ 2000, 2999 ], [ 3000, 3999 ], [ 4000, 4999] ]
Your Map State will receive this new data structure and each iteration will be one of the tuples. Iteration #1: [ 0, 999 ], Iteration #2: [ 1000, 1999 ], etc
Inside your iterator, call a lambda which uses the tuple indexes to query into your S3 file. AWS has a query language over S3 buckets called Amazon S3 Select
: https://docs.aws.amazon.com/AmazonS3/latest/dev/s3-glacier-select-sql-reference-select.html
Here’s another great resource on how to use S3 select and get the data into a readable state with node: https://thetrevorharmon.com/blog/how-to-use-s3-select-to-query-json-in-node-js
So, for iteration #1, we are querying the first 1,000 objects in our data structure. I can now call whatever function I normally would have inside my iterator.
What's key about this approach is the inputPath is never receiving a large data structure.
Upvotes: 1