user3066737
user3066737

Reputation: 143

boto3 and SWF example needed

Amazon are promoting boto3 for future development but do not provide enough documentation for the new boto3.

Does anybody have any example code of using SWF with boto3 that they would care to share?

Upvotes: 8

Views: 4119

Answers (2)

Sajjan Mishra
Sajjan Mishra

Reputation: 85

The link to the official documentation is [here][1].

There are a lot of code samples out there just follow the link or [this][2] one. Under the available service section, it has enlisted all the services that the boto3 now supports along with detail example.

Some of the examples are: boto3 and getting the execution count of SWF

import boto3
import datetime
import time
import dateutil.tz

def lambda_handler(event,context):
    swfClient = boto3.client('swf')
    currentTimeZone = dateutil.tz.gettz('Australia/Brisbane')
    latestDate = datetime.datetime.now(tz=currentTimeZone)
    oldestDate = latestDate - datetime.timedelta(1)

    fullTextPreloadResponse = swfClient.count_open_workflow_executions(
         domain=domainName,
         startTimeFilter={
             'oldestDate': oldestDate,
             'latestDate': latestDate
         },
         typeFilter={
             'name': 'NAME_OF_YOUR_SWF_WORKFLOW_NAME',
             'version': 'VERSION_NUMBER'
         }
     )
     print("the count is " + str(fullTextResponse['count']))
     print(fullTextResponse)

This is what I have used in my case to get the count of the running SWF Workflow type. The format that I have used is well defined in the documentation mentioned above.

To simply use boto3 & SWF together, it starts with importing boto3 in the python lambda function. Then python DateTime is being added. Then a boto3.client sets the client from where we can use | interact with SWF.

Other examples would be:

history = swf.get_workflow_execution_history(
            domain= domainName,
            execution={
                'workflowId': workflowId,
                'runId': runId
            },
        )

Hope this one helps you! [1]: https://boto3.amazonaws.com/v1/documentation/api/latest/index.html [2]: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/index.html

Upvotes: 1

AJ Venturella
AJ Venturella

Reputation: 4912

This is the only example I have found so far:

https://github.com/jhludwig/aws-swf-boto3

So the process overview looks like this (note this is pulled directly from the link above, but with some additional notes added and more of a flow).

It should be noted, SWF operates on the names of things. It's up to your code to give those names an execution meaning. For example, your Decider will poll and using the Task names decide what's next.

Some things I am not exactly certain on. TASKLIST references I believe are a kind of namespacing. It's not really a list of things, it's more about isolating things by name. Now I could be totally wrong about that, from my basic understanding, that's what I think it's saying.

You can run your Decider and Workers from ANYWHERE. Since they reach out and up to AWS, if your firewall allows 0.0.0.0/0 egress you will have access.

The AWS Docs also mention you can run a lambda, but I have not found out how to trigger that.

Create the boto3 swf client:

import boto3
from botocore.exceptions import ClientError

swf = boto3.client('swf')

Create a domain

try:
  swf.register_domain(
    name=<DOMAIN>,
    description="Test SWF domain",
    workflowExecutionRetentionPeriodInDays="10" # keep history for this long
  )
except ClientError as e:
    print "Domain already exists: ", e.response.get("Error", {}).get("Code")

With the domain created we now register the workflow:

Register Workflow

try:
  swf.register_workflow_type(
    domain=DOMAIN, # string
    name=WORKFLOW, # string
    version=VERSION, # string
    description="Test workflow",
    defaultExecutionStartToCloseTimeout="250",
    defaultTaskStartToCloseTimeout="NONE",
    defaultChildPolicy="TERMINATE",
    defaultTaskList={"name": TASKLIST } # TASKLIST is a string
  )
  print "Test workflow created!"
except ClientError as e:
  print "Workflow already exists: ", e.response.get("Error", {}).get("Code")

With our Workflow registered, we can now begin assign tasks.

Assign Tasks to the Workflow.

You can assign N tasks. Remember, these are mainly strings, your code will give them execution meaning.

try:
  swf.register_activity_type(
    domain=DOMAIN,
    name="DoSomething",
    version=VERSION, # string
    description="This is a worker that does something",
    defaultTaskStartToCloseTimeout="NONE",
    defaultTaskList={"name": TASKLIST } # TASKLIST is a string
  )
  print "Worker created!"
except ClientError as e:
  print "Activity already exists: ", e.response.get("Error", {}).get("Code")

Send Start the Workflow

With our Domain, Workflow, and Task created, we can now begin a workflow.

import boto3

swf = boto3.client('swf')

response = swf.start_workflow_execution(
  domain=DOMAIN # string,
  workflowId='test-1001',
  workflowType={
    "name": WORKFLOW,# string
    "version": VERSION # string
  },
  taskList={
      'name': TASKLIST
  },
  input=''
)

print "Workflow requested: ", response

Note the workflowId, this is a custom identifier, for example str(uuid.uuid4()). From the docs:

The user defined identifier associated with the workflow execution. You can use this to associate a custom identifier with the workflow execution. You may specify the same identifier if a workflow execution is logically a restart of a previous execution. You cannot have two open workflow executions with the same workflowId at the same time.

http://boto3.readthedocs.org/en/latest/reference/services/swf.html#SWF.Client.start_workflow_execution

At this point, nothing will happen because we don't have a Decider running nor any Workers. Lets see what those look like.

Decider

Our decider will poll to get a decide task to make a decision about:

import boto3
from botocore.client import Config
import uuid

botoConfig = Config(connect_timeout=50, read_timeout=70)
swf = boto3.client('swf', config=botoConfig)

Note the timeout settings above. You can reference this PR to see the rationale behind it:

https://github.com/boto/botocore/pull/634

From the Boto3 SWF docs:

Workers should set their client side socket timeout to at least 70 seconds (10 seconds higher than the maximum time service may hold the poll request).

That PR is what enabled boto3 to do that functionality.

http://boto3.readthedocs.org/en/latest/reference/services/swf.html#SWF.Client.poll_for_decision_task

print "Listening for Decision Tasks"

while True:

  newTask = swf.poll_for_decision_task(
    domain=DOMAIN ,
    taskList={'name': TASKLIST }, # TASKLIST is a string
    identity='decider-1', # any identity you would like to provide, it's recorded in the history
    reverseOrder=False)

  if 'taskToken' not in newTask:
    print "Poll timed out, no new task.  Repoll"

  elif 'events' in newTask:

    eventHistory = [evt for evt in newTask['events'] if not evt['eventType'].startswith('Decision')]
    lastEvent = eventHistory[-1]

    if lastEvent['eventType'] == 'WorkflowExecutionStarted':
      print "Dispatching task to worker", newTask['workflowExecution'], newTask['workflowType']
      swf.respond_decision_task_completed(
        taskToken=newTask['taskToken'],
        decisions=[
          {
            'decisionType': 'ScheduleActivityTask',
            'scheduleActivityTaskDecisionAttributes': {
                'activityType':{
                    'name': TASKNAME, # string
                    'version': VERSION # string
                    },
                'activityId': 'activityid-' + str(uuid.uuid4()),
                'input': '',
                'scheduleToCloseTimeout': 'NONE',
                'scheduleToStartTimeout': 'NONE',
                'startToCloseTimeout': 'NONE',
                'heartbeatTimeout': 'NONE',
                'taskList': {'name': TASKLIST}, # TASKLIST is a string
            }
          }
        ]
      )
      print "Task Dispatched:", newTask['taskToken']

    elif lastEvent['eventType'] == 'ActivityTaskCompleted':
      swf.respond_decision_task_completed(
        taskToken=newTask['taskToken'],
        decisions=[
          {
            'decisionType': 'CompleteWorkflowExecution',
            'completeWorkflowExecutionDecisionAttributes': {
              'result': 'success'
            }
          }
        ]
      )
      print "Task Completed!"

Note that at the end of this snippet, we check if we have ActivityTaskCompleted and we respond with the decision CompleteWorkflowExecution to let SWF know we are done.

That's out decider, what's the worker look like?

Worker

http://boto3.readthedocs.org/en/latest/reference/services/swf.html#SWF.Client.poll_for_activity_task

Note again, we set the read_timeout

import boto3
from botocore.client import Config

botoConfig = Config(connect_timeout=50, read_timeout=70)
swf = boto3.client('swf', config=botoConfig)

Now we start our worker polling:

print "Listening for Worker Tasks"

while True:

  task = swf.poll_for_activity_task(
    domain=DOMAIN,# string
    taskList={'name': TASKLIST}, # TASKLIST is a string
    identity='worker-1') # identity is for our history

  if 'taskToken' not in task:
    print "Poll timed out, no new task.  Repoll"

  else:
    print "New task arrived"

    swf.respond_activity_task_completed(
        taskToken=task['taskToken'],
        result='success'
    )

    print "Task Done"

Again we signal SWF that we have completed our work.

Upvotes: 17

Related Questions