user626511
user626511

Reputation:

AWS Step Functions Activity Worker not seeing execution when worker was stopped

Version of AWS SDK for Go?

v2.0.0-preview.3

Version of Go (go version)?

go1.9.3 darwin/amd64

What issue did you see?

I'm writing an Activity Worker for Step Functions in Go.

When:

Everything appears to be working fine.

However, when:

The worker appears to poll SFN but it does not execute tasks initiated during the time it was stopped. If we start a new workflow execution at this point (while the worker is running), the worker performs the new task successfully. The workflows executed during the time the worker was stopped is not being picked up by the worker.

EDIT: Looking at the execution history, I see the Timed Out status and the following event log:

enter image description here

Steps to reproduce

Here if my SFN state machine:

{
  "Comment": "An example using a Task state.",
  "StartAt": "getGreeting",
  "Version": "1.0",
  "TimeoutSeconds": 300,
  "States":
  {
    "getGreeting": {
      "Type": "Task",
      "Resource": "arn:aws:states:ap-southeast-1:196709014601:activity:get-greeting",
      "End": true
    }
  }
}

Here is my SFN worker:

package main

import (
    "github.com/aws/aws-sdk-go-v2/aws"
    "github.com/aws/aws-sdk-go-v2/aws/endpoints"
    "github.com/aws/aws-sdk-go-v2/aws/external"
    "github.com/aws/aws-sdk-go-v2/service/sfn"
    "fmt"
    "encoding/json"
)

type Worker struct {
    svc             *sfn.SFN
    activityARN     string
}

type Task struct {
    input   *string
    token   *string
}

func New(activityARN string) *Worker {
    cfg, err := external.LoadDefaultAWSConfig()
    if err != nil {
        panic("unable to load SDK config, " + err.Error())
    }
    // Set the AWS Region that the service clients should use
    cfg.Region = endpoints.ApSoutheast1RegionID

    // Using the Config value, create the Step Functions client
    svc := sfn.New(cfg)

    w := &Worker{
        svc: svc,
        activityARN: activityARN,
    }
    return w
}

func (w *Worker) getTask() *Task {
    req := w.svc.GetActivityTaskRequest(&sfn.GetActivityTaskInput{
        ActivityArn: aws.String(w.activityARN),
    })
    res, err := req.Send()
    if err != nil { fmt.Println("failed to get tasks, "+err.Error()) }
    return &Task{
        input: res.Input,
        token: res.TaskToken,
    }
}

// Call SendTaskSuccess on success
func (w *Worker) handleSuccess(taskToken *string, json *string) error {
    req := w.svc.SendTaskSuccessRequest(&sfn.SendTaskSuccessInput{
        TaskToken: taskToken,
        Output: json, // JSON string
    })
    _, err := req.Send()
    if err != nil { fmt.Println("failed to send task success result, "+err.Error()) }
    return err
}

// Call SendTaskFailure on error
func (w *Worker) handleFailure(taskToken *string, err error) error {
    errorMessage := err.Error()
    req := w.svc.SendTaskFailureRequest(&sfn.SendTaskFailureInput{
        TaskToken: taskToken,
        Error: &errorMessage,
    })

    _, err = req.Send()
    if err != nil { fmt.Println("failed to send task failure result, "+err.Error()) }
    return err
}

func main() {
    activityARN := "arn:aws:states:ap-southeast-1:196709014601:activity:get-greeting"
    worker := New(activityARN)

    fmt.Println("Starting worker")
    for {
        // 1. Poll GetActivityTask API for tasks
        fmt.Println("Polling for tasks")
        task := worker.getTask()
        if task.token == nil { continue }

        // 2. Do some actual work
        fmt.Println("Working")
        result, err := work(task.input)

        // 3. Notify SFN on success and failure
        fmt.Println("Sending results")
        if err == nil {
            worker.handleSuccess(task.token, result)
        } else {
            worker.handleFailure(task.token, err)
        }
    }
}

// Handles marshalling and un-marshalling JSON
func work(jsonInput *string) (*string, error) {
    input := &GreetInput{}
    json.Unmarshal([]byte(*jsonInput), input)

    result, err := Greet(input) // Actual work
    if err != nil { return nil, err }

    outputBytes, _ := json.Marshal(result)
    output := string(outputBytes)
    return &output, nil
}

// Actual handler code
type GreetInput struct {
    Who string
}

type GreetOutput struct {
    Message string
}

func Greet(input *GreetInput) (*GreetOutput, error) {
    message := fmt.Sprintf("hello %s", input.Who)
    output := &GreetOutput {
        Message: message,
    }
    fmt.Println(message)
    return output, nil
}

To run:

go build worker.go && ./worker

Upvotes: 0

Views: 3398

Answers (1)

Marcin Sucharski
Marcin Sucharski

Reputation: 1231

Based on your update I suppose that worker is not gracefully stopped (i.e. when killing worker you are not waiting till GetActivityTask request ends) therefore Step Functions may respond to (already dead) worker.

So workflow is following:

  1. Worker sends GetActivityTask request and halts (till timeout is reached).
  2. Worker is killed without waiting for GetActivityTask to end.
  3. New execution is created.
  4. Step Functions sees that some GetActivityTask is still hanging - sends task from new execution to it.
  5. However worker is already dead so it won't receive that task. Step Functions thinks that task has been delivered so it waits till task ends or timeouts.

To check if that's the case simply wait a bit after killing worker (I don't know what is the default wait time for GetActivityTask in AWS SDK for Go - 5 minutes should do the job) and then create execution. If new execution works as expected then you should add gracefull exit to the worker (wait till GetActivityTask ends and eventually process task).

Upvotes: 1

Related Questions