MountainBiker
MountainBiker

Reputation: 411

State machine task gets stuck in "Running" state after calling SNS Topic

I have a step function that publishes to an SNS topic, which then sends an email notification. The email notification is sent as expected, but then the task gets stuck in "running" state when it should exit and terminate the step function. Does anyone know where I'm going wrong or what might be causing this?

"ErrorNotification": {
  "Type": "Task",
  "Resource":"arn:aws:states:::sns:publish.waitForTaskToken",
  "OutputPath": "$",
  "Parameters": {
    "TopicArn": "<topic-arn>",
    "Message":{  
      "Input.$":"$",
      "TaskToken.$":"$$.Task.Token"
    }
  },
   "End": true
},  

Upvotes: 0

Views: 1028

Answers (1)

samtoddler
samtoddler

Reputation: 9635

this specific line

"Resource":"arn:aws:states:::sns:publish.waitForTaskToken",

implements a Wait for a Callback with the Task Token

Call Amazon SNS with Step Functions

The following includes a Task state that publishes to an Amazon SNS topic and then waits for the task token to be returned. See Wait for a Callback with the Task Token.

{  
   "StartAt":"Send message to SNS",
   "States":{  
      "Send message to SNS":{  
         "Type":"Task",
         "Resource":"arn:aws:states:::sns:publish.waitForTaskToken",
         "Parameters":{  
            "TopicArn":"arn:aws:sns:us-east-1:123456789012:myTopic",
            "Message":{  
               "Input.$":"$",
               "TaskToken.$":"$$.Task.Token"
            }
         },
         "End":true
      }
   }
}

In that case, you need to check if you are sending the appropriate event from the (usually a lambda) who is handling the callback and sending the final response back.

For example I handle my callback functionality via a lambda roughly like below for successful or failed.


...
LOG.info(f"Sending task heartbeat for task ID {body['taskToken']}")
STEP_FUNCTIONS_CLIENT.send_task_heartbeat(taskToken=body["taskToken"])
is_task_success = random.choice([True, False])

if is_task_success:
    LOG.info(f"Sending task success for task ID {body['taskToken']}")
    STEP_FUNCTIONS_CLIENT.send_task_success(
        taskToken=body["taskToken"],
        output=json.dumps({"id": body['id']})
    )
else:
    LOG.info(f"Sending task failure for task ID {body['taskToken']}")
    STEP_FUNCTIONS_CLIENT.send_task_failure(
        taskToken=body["taskToken"],
        cause="Random choice returned False."
    )
..

Upvotes: 1

Related Questions