Adrian
Adrian

Reputation: 3

How to take AWS Batch Job ID from Step Function?

I am setting up a pipeline where I take celery message from SQS, decode it and run AWS Step Function. Inside Step Function I am submitting AWS Batch Job that runs my python script.

The issue I am facing is that I want to update in my DRF API endpoint job_id field when the job is submitted so BEFORE docker container is started. However I am not sure how to take AWS Batch Job ID from Step Function before Running state in AWS Batch Job.

The only solution I see is to submit AWS Batch Job directly in my lambda function without starting Step Function but I do not want to do it this way because I have other pipelines running in Step Function so I want to keep consistency. That's why I am asking you for help. Maybe someone had similar case and is willing to help.

Below is my AWS Lambda and AWS Step Function definition.

lambda_function.py:

def start_step_function(step_function_input):
    state_machine_arn = os.environ['STEP_FUNCTION_ARN_MYAPP']

    response = client.start_execution(
        stateMachineArn=state_machine_arn,
        input=json.dumps(step_function_input)
    )

    return response
def lambda_handler(event, context):
    try:
        for record in event['Records']:
            # Step 1: Decode the body
            body_json = decode_body(record['body'])
            
            # Step 2: Extract parameters
            step_function_input = extract_parameters(body_json)

            logger.info(f"Step Function input: {step_function_input}")
            
            # Step 3: Start the Step Function execution
            start_step_function(step_function_input)

        
        return {
            'statusCode': 200,
            'body': json.dumps('Step Function executed successfully!')
        }
    except Exception as e:
        logger.error(f"Error processing record: {e}")
        return {
            'statusCode': 400,
            'body': json.dumps(f'Error processing record: {str(e)}')
        }

step function definition:

{
  "Comment": "A description of my state machine",
  "StartAt": "submit_my_job",
  "States": {
    "submit_my_job": {
      "Type": "Task",
      "Resource": "arn:aws:states:::batch:submitJob.sync",
      "Parameters": {
        "JobName": "my_job",
        "JobQueue": "arn:aws:batch:eu-west-1:380921374981:job-queue/my_job-worker-queue",
        "Parameters": {
          "TASK_ID.$": "$.task_id",
          "SPIDER_ID.$": "$.spider_id"
        },
        "JobDefinition": "arn:aws:batch:eu-west-1:380921374981:job-definition/my_job"
      },
      "Next": "Success",
      "TimeoutSeconds": 5400,
      "Catch": [
        {
          "ErrorEquals": [
            "States.TaskFailed"
          ],
          "Next": "Fail"
        }
      ]
    },
    "Success": {
      "Type": "Succeed"
    },
    "Fail": {
      "Type": "Fail"
    }
  },
  "TimeoutSeconds": 5400
}

I've tried to add Step 4 in my lambda to see what can I take at this stage but I can only refer to step function ARN.

...
# Step 3: Start the Step Function execution
response = start_step_function(step_function_input)

# Extract jobID from the response (assuming it's in the response ARN)
job_id = response['executionArn'].split(':')[-1] # it is step function ID, not Batch job :(
task_id = step_function_input['task_id']

# Step 4: Update the task with the new job_id
update_task_in_my_api(task_id, {"job_id": job_id}, reason="Step Function execution completed")
completed")

Ideally I want to somehow take jobID from Step Function output, it looks like this when it is in Running state:


  "JobArn": "arn:aws:batch:eu-west-1:380921374981:job/53425042-22b2-4cad-a6a1-a7d881001702",
  "JobId": "53425042-22b2-4cad-a6a1-a7d881001702",
  "JobName": "my_job",
  "SdkHttpMetadata": {
    "AllHttpHeaders": {
      "Access-Control-Expose-Headers": [
        "X-amzn-errortype,X-amzn-requestid,X-amzn-errormessage,X-amzn-trace-id,X-amz-apigw-id,date"
      ],
      "x-amz-apigw-id": [
        "YesN4HTFDoEEXSw="
      ],
      "Access-Control-Allow-Origin": [
        "*"
      ],
      "Connection": [
        "keep-alive"
      ],
      "x-amzn-RequestId": [
        "91eab790-5e04-45bf-a94c-fa4891cda6d5"
      ],
      "Content-Length": [
        "169"
      ],
      "Date": [
        "Tue, 28 May 2024 11:32:15 GMT"
      ],
      "X-Amzn-Trace-Id": [
        "Root=1-6655c0bd-520650647e897a7fa51a62b7"
      ],
      "Content-Type": [
        "application/json"
      ]
    },
    "HttpHeaders": {
      "Access-Control-Allow-Origin": "*",
      "Access-Control-Expose-Headers": "X-amzn-errortype,X-amzn-requestid,X-amzn-errormessage,X-amzn-trace-id,X-amz-apigw-id,date",
      "Connection": "keep-alive",
      "Content-Length": "169",
      "Content-Type": "application/json",
      "Date": "Tue, 28 May 2024 11:32:15 GMT",
      "x-amz-apigw-id": "YesN4HTFDoEEXSw=",
      "x-amzn-RequestId": "91eab790-5e04-45bf-a94c-fa4891cda6d5",
      "X-Amzn-Trace-Id": "Root=1-6655c0bd-520650647e897a7fa51a62b7"
    },
    "HttpStatusCode": 200
  },
  "SdkResponseMetadata": {
    "RequestId": "91eab790-5e04-45bf-a94c-fa4891cda6d5"
  }
}

My question is how can I take jobID and update my APIendpoint before my AWS Batch Job is started?

Upvotes: 0

Views: 387

Answers (0)

Related Questions