Reputation: 1148
I'm using AWS Athena to query raw data from S3. Since Athena writes the query output into S3 output bucket I used to do:
df = pd.read_csv(OutputLocation)
But this seems like an expensive way. Recently I noticed the get_query_results
method of boto3
which returns a complex dictionary of the results.
client = boto3.client('athena')
response = client.get_query_results(
QueryExecutionId=res['QueryExecutionId']
)
I'm facing two main issues:
get_query_results
into pandas
data frame?get_query_results
only returns 1000 rows. How can I use it to get two million rows? Upvotes: 32
Views: 61683
Reputation: 182
While the previously given answers are valid in their own sense, they do not really answer the original question, which is "How can I format the results of get_query_results into pandas data frame? "
If one does not want to rerun the query, or dump results to csv, but create a Pandas dataframe directly from the result object that is at hand, if the limitation of get_query_results only returning 1000 rows is not an issue, or if one makes use of the continue pagination feature of get_query_results and builds up the complete result:
columns = [item['VarCharValue'] for item in result[0]['Data']]
values = [[item.get('VarCharValue', '') for item in row['Data']] for row in result[1:]]
df = pd.DataFrame(values, columns=columns)
Having said that, it might be a good idea to check the size of the result object beforehand. Following gives the size in MB
import sys
sys.getsizeof(result) / 1e6
and keep in mind that the Pandas dataframe will be of similar size, hence you need to have as much free memory available before loading it into a dataframe.
If you have barely enough memory, deleting the result object after creating a dataframe would free up memory
del result
Upvotes: 1
Reputation: 2045
get_query_results only returns 1000 rows. How can I use it to get two million rows into a Pandas dataframe?
If you try to add:
client.get_query_results(QueryExecutionId=res['QueryExecutionId'], MaxResults=2000)
You will obtain the next error:
An error occurred (InvalidRequestException) when calling the GetQueryResults operation: MaxResults is more than maximum allowed length 1000.
You can obtain millions of rows if you obtain the file directly from your bucket s3 (in the next example into a Pandas Dataframe):
def obtain_data_from_s3(self):
self.resource = boto3.resource('s3',
region_name = self.region_name,
aws_access_key_id = self.aws_access_key_id,
aws_secret_access_key= self.aws_secret_access_key)
response = self.resource \
.Bucket(self.bucket) \
.Object(key= self.folder + self.filename + '.csv') \
.get()
return pd.read_csv(io.BytesIO(response['Body'].read()), encoding='utf8')
The self.filename can be:
self.filename = response['QueryExecutionId'] + ".csv"
Because Athena names the files as the QueryExecutionId. I will write you all my code that takes a query and return a dataframe with all the rows and columns.
import time
import boto3
import pandas as pd
import io
class QueryAthena:
def __init__(self, query, database):
self.database = database
self.folder = 'my_folder/'
self.bucket = 'my_bucket'
self.s3_input = 's3://' + self.bucket + '/my_folder_input'
self.s3_output = 's3://' + self.bucket + '/' + self.folder
self.region_name = 'us-east-1'
self.aws_access_key_id = "my_aws_access_key_id"
self.aws_secret_access_key = "my_aws_secret_access_key"
self.query = query
def load_conf(self, q):
try:
self.client = boto3.client('athena',
region_name = self.region_name,
aws_access_key_id = self.aws_access_key_id,
aws_secret_access_key= self.aws_secret_access_key)
response = self.client.start_query_execution(
QueryString = q,
QueryExecutionContext={
'Database': self.database
},
ResultConfiguration={
'OutputLocation': self.s3_output,
}
)
self.filename = response['QueryExecutionId']
print('Execution ID: ' + response['QueryExecutionId'])
return response
except Exception as e:
print(e)
def run_query(self):
queries = [self.query]
for q in queries:
res = self.load_conf(q)
try:
query_status = None
while query_status == 'QUEUED' or query_status == 'RUNNING' or query_status is None:
query_status = self.client.get_query_execution(QueryExecutionId=res["QueryExecutionId"])['QueryExecution']['Status']['State']
print(query_status)
if query_status == 'FAILED' or query_status == 'CANCELLED':
raise Exception('Athena query with the string "{}" failed or was cancelled'.format(self.query))
time.sleep(10)
print('Query "{}" finished.'.format(self.query))
df = self.obtain_data()
return df
except Exception as e:
print(e)
def obtain_data(self):
try:
self.resource = boto3.resource('s3',
region_name = self.region_name,
aws_access_key_id = self.aws_access_key_id,
aws_secret_access_key= self.aws_secret_access_key)
response = self.resource \
.Bucket(self.bucket) \
.Object(key= self.folder + self.filename + '.csv') \
.get()
return pd.read_csv(io.BytesIO(response['Body'].read()), encoding='utf8')
except Exception as e:
print(e)
if __name__ == "__main__":
query = "SELECT * FROM bucket.folder"
qa = QueryAthena(query=query, database='myAthenaDb')
dataframe = qa.run_query()
Upvotes: 39
Reputation: 1018
While not the intent of the question, a few million row csv could be only several hundred Mb.
Download query results as csv from the AWS console and then load into pandas using pandas.read_csv()
is probably faster and easier than implementing one of the above solutions. It does not scale as well, but the OP only asked for 2 million rows. I have used successfully on files twice that size.
Upvotes: 0
Reputation: 616
You can use AWS SDK for Pandas to create pandas data frame directly querying through Athena.
import awswrangler as wr
df = wr.athena.read_sql_query(sql="SELECT * FROM <table_name_in_Athena>", database="<database_name>")
You can find more information here
Upvotes: 41
Reputation: 1148
I have a solution for my first question, using the following function
def results_to_df(results):
columns = [
col['Label']
for col in results['ResultSet']['ResultSetMetadata']['ColumnInfo']
]
listed_results = []
for res in results['ResultSet']['Rows'][1:]:
values = []
for field in res['Data']:
try:
values.append(list(field.values())[0])
except:
values.append(list(' '))
listed_results.append(
dict(zip(columns, values))
)
return listed_results
and then:
t = results_to_df(response)
pd.DataFrame(t)
As for my 2nd question and to the request of @EricBellet I'm also adding my approach for pagination which I find as inefficient and longer in compare to loading the results from Athena output in S3:
def run_query(query, database, s3_output):
'''
Function for executing Athena queries and return the query ID
'''
client = boto3.client('athena')
response = client.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': database
},
ResultConfiguration={
'OutputLocation': s3_output,
}
)
print('Execution ID: ' + response['QueryExecutionId'])
return response
def format_result(results):
'''
This function format the results toward append in the needed format.
'''
columns = [
col['Label']
for col in results['ResultSet']['ResultSetMetadata']['ColumnInfo']
]
formatted_results = []
for result in results['ResultSet']['Rows'][0:]:
values = []
for field in result['Data']:
try:
values.append(list(field.values())[0])
except:
values.append(list(' '))
formatted_results.append(
dict(zip(columns, values))
)
return formatted_results
res = run_query(query_2, database, s3_ouput) #query Athena
import sys
import boto3
marker = None
formatted_results = []
query_id = res['QueryExecutionId']
i = 0
start_time = time.time()
while True:
paginator = client.get_paginator('get_query_results')
response_iterator = paginator.paginate(
QueryExecutionId=query_id,
PaginationConfig={
'MaxItems': 1000,
'PageSize': 1000,
'StartingToken': marker})
for page in response_iterator:
i = i + 1
format_page = format_result(page)
if i == 1:
formatted_results = pd.DataFrame(format_page)
elif i > 1:
formatted_results = formatted_results.append(pd.DataFrame(format_page))
try:
marker = page['NextToken']
except KeyError:
break
print ("My program took", time.time() - start_time, "to run")
It's not formatted so good but I think it does the job...
2021 Update
Today I'm using custom wrapping for aws-data-wrangler as the best solution for the original question I asked several years ago.
import awswrangler as wr
def run_athena_query(query, database, s3_output, boto3_session=None, categories=None, chunksize=None, ctas_approach=None, profile=None, workgroup='myTeamName', region_name='us-east-1', keep_files=False, max_cache_seconds=0):
"""
An end 2 end Athena query method, based on the AWS Wrangler package.
The method will execute a query and will return a pandas dataframe as an output.
you can read more in https://aws-data-wrangler.readthedocs.io/en/stable/stubs/awswrangler.athena.read_sql_query.html
Args:
- query: SQL query.
- database (str): AWS Glue/Athena database name - It is only the original database from where the query will be launched. You can still using and mixing several databases writing the full table name within the sql (e.g. database.table).
- ctas_approach (bool): Wraps the query using a CTAS, and read the resulted parquet data on S3. If false, read the regular CSV on S3.
- categories (List[str], optional): List of columns names that should be returned as pandas.Categorical. Recommended for memory restricted environments.
- chunksize (Union[int, bool], optional): If passed will split the data in a Iterable of DataFrames (Memory friendly). If True wrangler will iterate on the data by files in the most efficient way without guarantee of chunksize. If an INTEGER is passed Wrangler will iterate on the data by number of rows igual the received INTEGER.
- s3_output (str, optional): Amazon S3 path.
- workgroup (str, optional): Athena workgroup.
- keep_files (bool): Should Wrangler delete or keep the staging files produced by Athena? default is False
- profile (str, optional): aws account profile. if boto3_session profile will be ignored.
- boto3_session (boto3.Session(), optional): Boto3 Session. The default boto3 session will be used if boto3_session receive None. if profilename is provided a session will automatically be created.
- max_cache_seconds (int): Wrangler can look up in Athena’s history if this query has been run before. If so, and its completion time is less than max_cache_seconds before now, wrangler skips query execution and just returns the same results as last time. If reading cached data fails for any reason, execution falls back to the usual query run path. by default is = 0
Returns:
- Pandas DataFrame
"""
# test for boto3 session and profile.
if ((boto3_session == None) & (profile != None)):
boto3_session = boto3.Session(profile_name=profile, region_name=region_name)
print("Querying AWS Athena...")
try:
# Retrieving the data from Amazon Athena
athena_results_df = wr.athena.read_sql_query(
query,
database=database,
boto3_session=boto3_session,
categories=categories,
chunksize=chunksize,
ctas_approach=ctas_approach,
s3_output=s3_output,
workgroup=workgroup,
keep_files=keep_files,
max_cache_seconds=max_cache_seconds
)
print("Query completed, data retrieved successfully!")
except Exception as e:
print(f"Something went wrong... the error is:{e}")
raise Exception(e)
return athena_results_df
you can read more here
Upvotes: 22
Reputation: 21
I´ve used a while loop approach to solve this, in case NextToken is present, I extend que dataframe:
# Receive Query Results
# Method get_query_results() limits to max 1000, handled with while, and called NextToken.
query_results = athena_client.get_query_results(QueryExecutionId=execution_response['QueryExecutionId'])
results = query_results['ResultSet']['Rows']
while 'NextToken' in query_results:
query_results = athena_client.get_query_results(QueryExecutionId=execution_response['QueryExecutionId'], NextToken = query_results['NextToken'])
results.extend(query_results['ResultSet']['Rows'])
return results
return query_results['ResultSet']['Rows']
Upvotes: 2
Reputation: 1
Try this approach to convert response['records'] into dataframe using columnMetadata:
def results_to_df(response):
columns = [
col['label']
for col in response['columnMetadata']
]
listed_results = [[list(col.values())[0] if list(col.values())[0] else '' for col in
record] for record in response['records']]
df = pd.DataFrame(listed_results, columns=columns)
return df
Upvotes: 0
Reputation: 463
Maybe you can try to use pandas read_sql and pyathena:
from pyathena import connect
import pandas as pd
conn = connect(s3_staging_dir='s3://bucket/folder',region_name='region')
df = pd.read_sql('select * from database.table', conn) #don't change the "database.table"
Upvotes: 4
Reputation: 129
A very simple solution is to use a list comprehension with the boto3 Athena paginator. The list comprehension can then be simply passed into the pd.DataFrame()
to create a DataFrame as such,
pd.DataFrame([[data.get('VarCharValue') for data in row['Data']] for row in
results['ResultSet']['Rows']])
import pandas as pd
import boto3
result = get_query_results( . . . ) # your code here
def cleanQueryResult(result) :
'''
This will take the dictionary of the raw Boto3 Athena results and turn it into a
2D array for further processing
Parameters
----------
result dict
The dictionary from the boto3 Athena client function get_query_results
Returns
-------
list(list())
2D list which is essentially the table result. The first row is the column name.
'''
return [[data.get('VarCharValue') for data in row['Data']]
for row in result['ResultSet']['Rows']]
# note that row 1 is the header
df = pd.DataFrame(cleanQueryResult(result))
This requires a the paginator object, https://boto3.amazonaws.com/v1/documentation/api/1.9.42/reference/services/athena.html#paginators
As a hint, here's how you can append after each page
df.append(pd.DataFrame(cleanQueryResult(next_page), ignore_index = True))
Upvotes: 11