fuggy_yama
fuggy_yama

Reputation: 617

How to wait for a step completion in AWS EMR cluster using Boto3

Given a step id I want to wait for that AWS EMR step to finish. How can I achieve this? Is there a built-in function?

At the time of writing, the Boto3 Waiters for EMR allow to wait for Cluster Running and Cluster Termination events:

EMR Waiters

Upvotes: 7

Views: 8561

Answers (4)

Laren Crawford
Laren Crawford

Reputation: 669

I wrote a generic status_poller function as part of an interactive demo of EMR on GitHub.

The status_poller function loops and calls a function, printing '.' or the new status until the specified status is returned:

def status_poller(intro, done_status, func):
    """
    Polls a function for status, sleeping for 10 seconds between each query,
    until the specified status is returned.
    :param intro: An introductory sentence that informs the reader what we're
                  waiting for.
    :param done_status: The status we're waiting for. This function polls the status
                        function until it returns the specified status.
    :param func: The function to poll for status. This function must eventually
                 return the expected done_status or polling will continue indefinitely.
    """
    status = None
    print(intro)
    print("Current status: ", end='')
    while status != done_status:
        prev_status = status
        status = func()
        if prev_status == status:
            print('.', end='')
        else:
            print(status, end='')
        sys.stdout.flush()
        time.sleep(10)
    print()

To check for step complete you'd call it like this:

status_poller(
    "Waiting for step to complete...",
    'COMPLETED',
    lambda:
    emr_basics.describe_step(cluster_id, step_id, emr_client)['Status']['State'])

Upvotes: 0

Rich Smith
Rich Smith

Reputation: 1695

There is now a waiter available for step complete events. It was added in a recent boto3 version.

http://boto3.readthedocs.io/en/latest/reference/services/emr.html#EMR.Waiter.StepComplete

Example code:

import boto3

client = boto3.client("emr")
waiter = client.get_waiter("step_complete")
waiter.wait(
    ClusterId='the-cluster-id',
    StepId='the-step-id',
    WaiterConfig={
        "Delay": 30,
        "MaxAttempts": 10
    }
)

Upvotes: 11

fuggy_yama
fuggy_yama

Reputation: 617

I came up with the following code (if you set max_attempts to 0 or less then it will simply wait until there will be no running/pending steps):

def wait_for_steps_completion(emr_client, emr_cluster_id, max_attempts=0):
    sleep_seconds = 30
    num_attempts = 0

    while True:
        response = emr_client.list_steps(
            ClusterId=emr_cluster_id,
            StepStates=['PENDING', 'CANCEL_PENDING', 'RUNNING']
        )
        num_attempts += 1
        active_aws_emr_steps = response['Steps']

        if active_aws_emr_steps:
            if 0 < max_attempts <= num_attempts:
                raise Exception(
                    'Max attempts exceeded while waiting for AWS EMR steps completion. Last response:\n'
                    + json.dumps(response, indent=3, default=str)
                )
            time.sleep(sleep_seconds)
        else:
            return

Upvotes: 5

helloV
helloV

Reputation: 52375

There is no built-in function in Boto3. But you can write your own waiter.

See: describe_step

Call describe_step with cluster_id and step_id. The response is a dictionary that contains detail about the step. One of the keys is 'State' that has information about the step state. If the state is not COMPLETED, wait for few seconds try again until it is COMPLETED or the wait time exceeds your limit.

'State': 'PENDING'|'CANCEL_PENDING'|'RUNNING'|'COMPLETED'|'CANCELLED'|'FAILED'|'INTERRUPTED'

Upvotes: 4

Related Questions