Reputation: 617
Given a step id I want to wait for that AWS EMR step to finish. How can I achieve this? Is there a built-in function?
At the time of writing, the Boto3 Waiters for EMR allow to wait for Cluster Running and Cluster Termination events:
Upvotes: 7
Views: 8561
Reputation: 669
I wrote a generic status_poller function as part of an interactive demo of EMR on GitHub.
The status_poller function loops and calls a function, printing '.' or the new status until the specified status is returned:
def status_poller(intro, done_status, func):
"""
Polls a function for status, sleeping for 10 seconds between each query,
until the specified status is returned.
:param intro: An introductory sentence that informs the reader what we're
waiting for.
:param done_status: The status we're waiting for. This function polls the status
function until it returns the specified status.
:param func: The function to poll for status. This function must eventually
return the expected done_status or polling will continue indefinitely.
"""
status = None
print(intro)
print("Current status: ", end='')
while status != done_status:
prev_status = status
status = func()
if prev_status == status:
print('.', end='')
else:
print(status, end='')
sys.stdout.flush()
time.sleep(10)
print()
To check for step complete you'd call it like this:
status_poller(
"Waiting for step to complete...",
'COMPLETED',
lambda:
emr_basics.describe_step(cluster_id, step_id, emr_client)['Status']['State'])
Upvotes: 0
Reputation: 1695
There is now a waiter available for step complete events. It was added in a recent boto3 version.
http://boto3.readthedocs.io/en/latest/reference/services/emr.html#EMR.Waiter.StepComplete
Example code:
import boto3
client = boto3.client("emr")
waiter = client.get_waiter("step_complete")
waiter.wait(
ClusterId='the-cluster-id',
StepId='the-step-id',
WaiterConfig={
"Delay": 30,
"MaxAttempts": 10
}
)
Upvotes: 11
Reputation: 617
I came up with the following code (if you set max_attempts
to 0 or less then it will simply wait until there will be no running/pending steps):
def wait_for_steps_completion(emr_client, emr_cluster_id, max_attempts=0):
sleep_seconds = 30
num_attempts = 0
while True:
response = emr_client.list_steps(
ClusterId=emr_cluster_id,
StepStates=['PENDING', 'CANCEL_PENDING', 'RUNNING']
)
num_attempts += 1
active_aws_emr_steps = response['Steps']
if active_aws_emr_steps:
if 0 < max_attempts <= num_attempts:
raise Exception(
'Max attempts exceeded while waiting for AWS EMR steps completion. Last response:\n'
+ json.dumps(response, indent=3, default=str)
)
time.sleep(sleep_seconds)
else:
return
Upvotes: 5
Reputation: 52375
There is no built-in function in Boto3. But you can write your own waiter.
See: describe_step
Call describe_step
with cluster_id
and step_id
. The response is a dictionary that contains detail about the step. One of the keys is 'State' that has information about the step state. If the state is not COMPLETED, wait for few seconds try again until it is COMPLETED or the wait time exceeds your limit.
'State': 'PENDING'|'CANCEL_PENDING'|'RUNNING'|'COMPLETED'|'CANCELLED'|'FAILED'|'INTERRUPTED'
Upvotes: 4