Reputation: 89
I've been successfully querying s3 via athena from inside a lambda function for quite some time but it has suddenly stopped working. Further investigation shows that the response from get_query_execution() is returned a state of 'QUEUED' (which i was led to believe is not used?!)
My code is as follows:
def run_query(query, database, s3_output, max_execution=5):
response = client.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': database
},
ResultConfiguration={
'OutputLocation': s3_output
})
execution_id = response['QueryExecutionId']
print("QueryExecutionId = " + str(execution_id))
state = 'RUNNING'
while (max_execution > 0 and state in ['RUNNING']):
max_execution = max_execution - 1
print("maxexecution=" + str(max_execution))
response = client.get_query_execution(QueryExecutionId = execution_id)
if 'QueryExecution' in response and \
'Status' in response['QueryExecution'] and \
'State' in response['QueryExecution']['Status']:
state = response['QueryExecution']['Status']['State']
print(state)
if state == 'SUCCEEDED':
print("Query SUCCEEDED: {}".format(execution_id))
s3_key = 'athena_output/' + execution_id + '.csv'
print(s3_key)
local_filename = '/tmp/' + execution_id + '.csv'
print(local_filename)
rows = []
try:
print("s3key =" + s3_key)
print("localfilename = " + local_filename)
s3.Bucket(BUCKET).download_file(s3_key, local_filename)
with open(local_filename) as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
rows.append(row)
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == "404":
print("The object does not exist.")
print(e)
else:
raise
return json.dumps(rows)
elif state == 'FAILED':
return False
time.sleep(10)
return False
So it obviously is working as it should be - it's just that the 'QUEUED' state is completely unexpected and i'm not sure what to do about it? What can cause the query_execution to become 'QUEUED' and what needs to change in my code to cater for it?
Upvotes: 2
Views: 4465
Reputation: 89
Got this response from AWS - there has been changes to Athena that caused this issue (although QUEUED
has been in the state enum for some time is hasn't been used until now):
The Athena team recently deployed a host of new functionality for Athena, including more granular CloudWatch metrics for Athena queries.
For more information:
AWS What's New page
Athena docs on CloudWatch metrics
As part of the deployment of more granular metrics, Athena now includes a QUEUED
status for queries. This status indicates that an Athena query is waiting for resources to be allocated for processing. Query flow is roughly:
SUBMITTED -> QUEUED -> RUNNING -> COMPLETED/FAILED
Note that queries that fail due to system errors can be put back into the queue and retried.
I apologise for the frustration that this change has caused.
It seems like the forum formatting has stripped some elements from your code snippets.
However, I think that your WHILE loop is working on an array of the possible query statuses, which didn't previously cater for QUEUED
.
If that is the case, then yes, adding QUEUED
to that array will allow your application to handle the new status.
Upvotes: 3
Reputation: 4923
Take a look on Athena hook in Apache Airflow. Athena has final states (SUCCEEDED, FAILED and CANCELLED) and intermediate states - RUNNING and QUEUED. QUEUED is a normal state for a query before it got stared. So you could use code like this:
def run_query(query, database, s3_output, max_execution=5):
response = client.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': database
},
ResultConfiguration={
'OutputLocation': s3_output
})
execution_id = response['QueryExecutionId']
print("QueryExecutionId = " + str(execution_id))
state = 'QUEUED'
while (max_execution > 0 and state in ['RUNNING', 'QUEUED']):
max_execution = max_execution - 1
print("maxexecution=" + str(max_execution))
response = client.get_query_execution(QueryExecutionId = execution_id)
if 'QueryExecution' in response and \
'Status' in response['QueryExecution'] and \
'State' in response['QueryExecution']['Status']:
state = response['QueryExecution']['Status']['State']
print(state)
if state == 'SUCCEEDED':
print("Query SUCCEEDED: {}".format(execution_id))
s3_key = 'athena_output/' + execution_id + '.csv'
print(s3_key)
local_filename = '/tmp/' + execution_id + '.csv'
print(local_filename)
rows = []
try:
print("s3key =" + s3_key)
print("localfilename = " + local_filename)
s3.Bucket(BUCKET).download_file(s3_key, local_filename)
with open(local_filename) as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
rows.append(row)
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == "404":
print("The object does not exist.")
print(e)
else:
raise
return json.dumps(rows)
elif state == 'FAILED' or state == 'CANCELLED':
return False
time.sleep(10)
return False
Upvotes: 4