Reputation: 81
My Athena queries appear to be too short in their results. Trying to figure out Why?
Setup:
Glue Catalogs (118.6 Gig in size). Data: Stored in S3 in both CSV and JSON format. Athena Query: When I query data for a whole table, I only get 40K results per Query, there should be 121Million Records for that query on average for one month's data.
Does Athena Cap query result data? Is this a service limit (the documentation does not suggest this to be the case).
Upvotes: 8
Views: 12241
Reputation: 101
import time
import json
import boto3
import pandas as pd
from datetime import datetime, timedelta, date
res = client.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': ' ' # Replace with your actual database
},
ResultConfiguration={
'OutputLocation': ' ', # Replace with your actual S3 output location aws-athena-query-results-129137241730-eu-west-1
}
)
while True:
# Get the query execution status
execution = client.get_query_execution(QueryExecutionId=res['QueryExecutionId'])
status = execution['QueryExecution']['Status']['State']
if status == 'SUCCEEDED':
# If the query execution succeeded, get the query results
response = client.get_query_results(QueryExecutionId=res['QueryExecutionId'])
# Fetch all the rows
while 'NextToken' in response:
next_response = client.get_query_results(QueryExecutionId=res['QueryExecutionId'], NextToken=response['NextToken'])
response['ResultSet']['Rows'].extend(next_response['ResultSet']['Rows'])
if 'NextToken' in next_response:
response['NextToken'] = next_response.get('NextToken')
else:
break
# Process the results in JSON FORMAT --> WILL have to parse to pd dataframe
resultados = response
elif status == 'FAILED' or status == 'CANCELLED':
# If the query execution failed or was cancelled, raise an exception
raise Exception('Query execution failed or was cancelled')
else:
# If the query execution is still running, wait for a while before checking the status again
time.sleep(5)
Upvotes: 2
Reputation: 29237
Another option is Paginate and count approach : Don't know whether better way to do it like select count(*) from table like...
Here is the complete example code ready to use. Used python boto3 athena api
I used paginator
and converted result as list of dict and also returning count along with the result.
below are 2 methods First one will paginate second one will convert paginated result to list of dict and calculate count.
Note : converting in to list of dict
is not necessary in this case. If you don't want that.. in the code you can modify to have only count
def get_athena_results_paginator(params, athena_client):
"""
:param params:
:param athena_client:
:return:
"""
query_id = athena_client.start_query_execution(
QueryString=params['query'],
QueryExecutionContext={
'Database': params['database']
}
# ,
# ResultConfiguration={
# 'OutputLocation': 's3://' + params['bucket'] + '/' + params['path']
# }
, WorkGroup=params['workgroup']
)['QueryExecutionId']
query_status = None
while query_status == 'QUEUED' or query_status == 'RUNNING' or query_status is None:
query_status = athena_client.get_query_execution(QueryExecutionId=query_id)['QueryExecution']['Status']['State']
if query_status == 'FAILED' or query_status == 'CANCELLED':
raise Exception('Athena query with the string "{}" failed or was cancelled'.format(params.get('query')))
time.sleep(10)
results_paginator = athena_client.get_paginator('get_query_results')
results_iter = results_paginator.paginate(
QueryExecutionId=query_id,
PaginationConfig={
'PageSize': 1000
}
)
count, results = result_to_list_of_dict(results_iter)
return results, count
def result_to_list_of_dict(results_iter):
"""
:param results_iter:
:return:
"""
results = []
column_names = None
count = 0
for results_page in results_iter:
print(len(list(results_iter)))
for row in results_page['ResultSet']['Rows']:
count = count + 1
column_values = [col.get('VarCharValue', None) for col in row['Data']]
if not column_names:
column_names = column_values
else:
results.append(dict(zip(column_names, column_values)))
return count, results
Upvotes: 2
Reputation: 675
So, getting 1000 results at a time obviously doesn't scale. Thankfully, there's a simple workaround. (Or maybe this is how it was supposed to be done all along.)
When you run an Athena query, you should get a QueryExecutionId. This Id corresponds to the output file you'll find in S3.
Here's a snippet I wrote:
s3 = boto3.resource("s3")
athena = boto3.client("athena")
response: Dict = athena.start_query_execution(QueryString=query, WorkGroup="<your_work_group>")
execution_id: str = response["QueryExecutionId"]
print(execution_id)
# Wait until the query is finished
while True:
try:
athena.get_query_results(QueryExecutionId=execution_id)
break
except botocore.exceptions.ClientError as e:
time.sleep(5)
local_filename: str = "temp/athena_query_result_temp.csv"
s3.Bucket("athena-query-output").download_file(execution_id + ".csv", local_filename)
return pd.read_csv(local_filename)
Make sure the corresponding WorkGroup has "Query result location" set, e.g. "s3://athena-query-output/"
Also see this thread with similar answers: How to Create Dataframe from AWS Athena using Boto3 get_query_results method
Upvotes: 6
Reputation: 4482
It seems that there is a limit of 1000.
You should use NextToken
to iterate over the results.
Quote of the GetQueryResults Documentation
MaxResults The maximum number of results (rows) to return in this request.
Type: Integer
Valid Range: Minimum value of 0. Maximum value of 1000.
Required: No
Upvotes: 4