user1133512
user1133512

Reputation: 89

Querying Athena from Lambda function - QUEUED state?

I've been successfully querying s3 via athena from inside a lambda function for quite some time but it has suddenly stopped working. Further investigation shows that the response from get_query_execution() is returned a state of 'QUEUED' (which i was led to believe is not used?!)

My code is as follows:

def run_query(query, database, s3_output, max_execution=5):
    response = client.start_query_execution(
        QueryString=query,
        QueryExecutionContext={
            'Database': database
        },
        ResultConfiguration={
            'OutputLocation': s3_output
    })

    execution_id = response['QueryExecutionId']
    print("QueryExecutionId = " + str(execution_id))
    state  = 'RUNNING'

    while (max_execution > 0 and state in ['RUNNING']):
        max_execution = max_execution - 1
        print("maxexecution=" + str(max_execution))
        response = client.get_query_execution(QueryExecutionId = execution_id)    

        if 'QueryExecution' in response and \
                'Status' in response['QueryExecution'] and \
                'State' in response['QueryExecution']['Status']:

                state = response['QueryExecution']['Status']['State']
                print(state)
                if state == 'SUCCEEDED':
                    print("Query SUCCEEDED: {}".format(execution_id))

                    s3_key = 'athena_output/' + execution_id + '.csv'
                    print(s3_key)
                    local_filename = '/tmp/' + execution_id + '.csv'
                    print(local_filename)

                    rows = []
                    try:
                        print("s3key =" + s3_key)
                        print("localfilename = " + local_filename)
                        s3.Bucket(BUCKET).download_file(s3_key, local_filename)
                        with open(local_filename) as csvfile:
                            reader = csv.DictReader(csvfile)
                            for row in reader:
                                rows.append(row)
                    except botocore.exceptions.ClientError as e:
                        if e.response['Error']['Code'] == "404":
                            print("The object does not exist.")
                            print(e)
                        else:
                            raise
                    return json.dumps(rows)
                elif state == 'FAILED':
                    return False
        time.sleep(10)
    return False

So it obviously is working as it should be - it's just that the 'QUEUED' state is completely unexpected and i'm not sure what to do about it? What can cause the query_execution to become 'QUEUED' and what needs to change in my code to cater for it?

Upvotes: 2

Views: 4465

Answers (2)

user1133512
user1133512

Reputation: 89

Got this response from AWS - there has been changes to Athena that caused this issue (although QUEUED has been in the state enum for some time is hasn't been used until now):

The Athena team recently deployed a host of new functionality for Athena, including more granular CloudWatch metrics for Athena queries.

For more information:

As part of the deployment of more granular metrics, Athena now includes a QUEUED status for queries. This status indicates that an Athena query is waiting for resources to be allocated for processing. Query flow is roughly:

SUBMITTED -> QUEUED -> RUNNING -> COMPLETED/FAILED

Note that queries that fail due to system errors can be put back into the queue and retried.

I apologise for the frustration that this change has caused.

It seems like the forum formatting has stripped some elements from your code snippets. However, I think that your WHILE loop is working on an array of the possible query statuses, which didn't previously cater for QUEUED. If that is the case, then yes, adding QUEUED to that array will allow your application to handle the new status.

Upvotes: 3

shuvalov
shuvalov

Reputation: 4923

Take a look on Athena hook in Apache Airflow. Athena has final states (SUCCEEDED, FAILED and CANCELLED) and intermediate states - RUNNING and QUEUED. QUEUED is a normal state for a query before it got stared. So you could use code like this:

def run_query(query, database, s3_output, max_execution=5):
    response = client.start_query_execution(
        QueryString=query,
        QueryExecutionContext={
            'Database': database
        },
        ResultConfiguration={
            'OutputLocation': s3_output
    })

    execution_id = response['QueryExecutionId']
    print("QueryExecutionId = " + str(execution_id))
    state  = 'QUEUED'

    while (max_execution > 0 and state in ['RUNNING', 'QUEUED']):
        max_execution = max_execution - 1
        print("maxexecution=" + str(max_execution))
        response = client.get_query_execution(QueryExecutionId = execution_id)    

        if 'QueryExecution' in response and \
                'Status' in response['QueryExecution'] and \
                'State' in response['QueryExecution']['Status']:

                state = response['QueryExecution']['Status']['State']
                print(state)
                if state == 'SUCCEEDED':
                    print("Query SUCCEEDED: {}".format(execution_id))

                    s3_key = 'athena_output/' + execution_id + '.csv'
                    print(s3_key)
                    local_filename = '/tmp/' + execution_id + '.csv'
                    print(local_filename)

                    rows = []
                    try:
                        print("s3key =" + s3_key)
                        print("localfilename = " + local_filename)
                        s3.Bucket(BUCKET).download_file(s3_key, local_filename)
                        with open(local_filename) as csvfile:
                            reader = csv.DictReader(csvfile)
                            for row in reader:
                                rows.append(row)
                    except botocore.exceptions.ClientError as e:
                        if e.response['Error']['Code'] == "404":
                            print("The object does not exist.")
                            print(e)
                        else:
                            raise
                    return json.dumps(rows)
                elif state == 'FAILED' or state == 'CANCELLED':
                    return False
        time.sleep(10)
    return False

Upvotes: 4

Related Questions