Reputation: 3
I am setting up a pipeline where I take celery message from SQS, decode it and run AWS Step Function. Inside Step Function I am submitting AWS Batch Job that runs my python script.
The issue I am facing is that I want to update in my DRF API endpoint job_id field when the job is submitted so BEFORE docker container is started. However I am not sure how to take AWS Batch Job ID from Step Function before Running state in AWS Batch Job.
The only solution I see is to submit AWS Batch Job directly in my lambda function without starting Step Function but I do not want to do it this way because I have other pipelines running in Step Function so I want to keep consistency. That's why I am asking you for help. Maybe someone had similar case and is willing to help.
Below is my AWS Lambda and AWS Step Function definition.
lambda_function.py:
def start_step_function(step_function_input):
state_machine_arn = os.environ['STEP_FUNCTION_ARN_MYAPP']
response = client.start_execution(
stateMachineArn=state_machine_arn,
input=json.dumps(step_function_input)
)
return response
def lambda_handler(event, context):
try:
for record in event['Records']:
# Step 1: Decode the body
body_json = decode_body(record['body'])
# Step 2: Extract parameters
step_function_input = extract_parameters(body_json)
logger.info(f"Step Function input: {step_function_input}")
# Step 3: Start the Step Function execution
start_step_function(step_function_input)
return {
'statusCode': 200,
'body': json.dumps('Step Function executed successfully!')
}
except Exception as e:
logger.error(f"Error processing record: {e}")
return {
'statusCode': 400,
'body': json.dumps(f'Error processing record: {str(e)}')
}
step function definition:
{
"Comment": "A description of my state machine",
"StartAt": "submit_my_job",
"States": {
"submit_my_job": {
"Type": "Task",
"Resource": "arn:aws:states:::batch:submitJob.sync",
"Parameters": {
"JobName": "my_job",
"JobQueue": "arn:aws:batch:eu-west-1:380921374981:job-queue/my_job-worker-queue",
"Parameters": {
"TASK_ID.$": "$.task_id",
"SPIDER_ID.$": "$.spider_id"
},
"JobDefinition": "arn:aws:batch:eu-west-1:380921374981:job-definition/my_job"
},
"Next": "Success",
"TimeoutSeconds": 5400,
"Catch": [
{
"ErrorEquals": [
"States.TaskFailed"
],
"Next": "Fail"
}
]
},
"Success": {
"Type": "Succeed"
},
"Fail": {
"Type": "Fail"
}
},
"TimeoutSeconds": 5400
}
I've tried to add Step 4 in my lambda to see what can I take at this stage but I can only refer to step function ARN.
...
# Step 3: Start the Step Function execution
response = start_step_function(step_function_input)
# Extract jobID from the response (assuming it's in the response ARN)
job_id = response['executionArn'].split(':')[-1] # it is step function ID, not Batch job :(
task_id = step_function_input['task_id']
# Step 4: Update the task with the new job_id
update_task_in_my_api(task_id, {"job_id": job_id}, reason="Step Function execution completed")
completed")
Ideally I want to somehow take jobID from Step Function output, it looks like this when it is in Running state:
"JobArn": "arn:aws:batch:eu-west-1:380921374981:job/53425042-22b2-4cad-a6a1-a7d881001702",
"JobId": "53425042-22b2-4cad-a6a1-a7d881001702",
"JobName": "my_job",
"SdkHttpMetadata": {
"AllHttpHeaders": {
"Access-Control-Expose-Headers": [
"X-amzn-errortype,X-amzn-requestid,X-amzn-errormessage,X-amzn-trace-id,X-amz-apigw-id,date"
],
"x-amz-apigw-id": [
"YesN4HTFDoEEXSw="
],
"Access-Control-Allow-Origin": [
"*"
],
"Connection": [
"keep-alive"
],
"x-amzn-RequestId": [
"91eab790-5e04-45bf-a94c-fa4891cda6d5"
],
"Content-Length": [
"169"
],
"Date": [
"Tue, 28 May 2024 11:32:15 GMT"
],
"X-Amzn-Trace-Id": [
"Root=1-6655c0bd-520650647e897a7fa51a62b7"
],
"Content-Type": [
"application/json"
]
},
"HttpHeaders": {
"Access-Control-Allow-Origin": "*",
"Access-Control-Expose-Headers": "X-amzn-errortype,X-amzn-requestid,X-amzn-errormessage,X-amzn-trace-id,X-amz-apigw-id,date",
"Connection": "keep-alive",
"Content-Length": "169",
"Content-Type": "application/json",
"Date": "Tue, 28 May 2024 11:32:15 GMT",
"x-amz-apigw-id": "YesN4HTFDoEEXSw=",
"x-amzn-RequestId": "91eab790-5e04-45bf-a94c-fa4891cda6d5",
"X-Amzn-Trace-Id": "Root=1-6655c0bd-520650647e897a7fa51a62b7"
},
"HttpStatusCode": 200
},
"SdkResponseMetadata": {
"RequestId": "91eab790-5e04-45bf-a94c-fa4891cda6d5"
}
}
My question is how can I take jobID and update my APIendpoint before my AWS Batch Job is started?
Upvotes: 0
Views: 387