Reputation: 79
I'm using AWS Athena to query raw data from S3. Since Athena writes the query output into S3 output bucket I am using Lambda function to get the data which is result of athena query into dataframe:
My code:
def athena_query_to_dataframe(db, s3Bucket, query):
import boto3
import pandas as pd
client = boto3.client('athena')
listOfStatus = ['SUCCEEDED', 'FAILED', 'CANCELLED']
listOfInitialStatus = ['RUNNING', 'QUEUED']
print('Starting Query Execution:')
tempS3Path = 's3://{}'.format(s3Bucket)
response = client.start_query_execution(
QueryString = query,
QueryExecutionContext = {
'Database': db
},
ResultConfiguration = {
'OutputLocation': tempS3Path,
}
)
queryExecutionId = response['QueryExecutionId']
status = client.get_query_execution(QueryExecutionId = queryExecutionId)['QueryExecution']['Status']['State']
while status in listOfInitialStatus:
status = client.get_query_execution(QueryExecutionId = queryExecutionId)['QueryExecution']['Status']['State']
if status in listOfStatus:
if status == 'SUCCEEDED':
print('Query Succeeded!')
paginator = client.get_paginator('get_query_results')
query_results = paginator.paginate(
QueryExecutionId = queryExecutionId,
PaginationConfig = {'PageSize': 1000}
)
elif status == 'FAILED':
print('Query Failed!')
elif status == 'CANCELLED':
print('Query Cancelled!')
break
results = []
rows = []
print('Processing Response')
for page in query_results:
for row in page['ResultSet']['Rows']:
rows.append(row['Data'])
columns = rows[0]
rows = rows[1:]
columns_list = []
for column in columns:
columns_list.append(column['VarCharValue'])
print('Creating Dataframe')
dataframe = pd.DataFrame(columns = columns_list)
for row in rows:
df_row = []
try:
for data in row:
df_row.append(data['VarCharValue'])
dataframe.loc[len(dataframe)] = df_row
except:
pass
When I am trying to return the df.shape I am only getting (0,20) which means df is not getting updated by the rows.
I am looking for below output:
Upvotes: 1
Views: 3217
Reputation: 177
You can also use pandas built-in function pd.read_sql() as awswrangler needs several permissions to execute.
import pandas as pd
df = pd.read_sql("select * from db_name.table_name")
Upvotes: 0
Reputation: 2358
The simplest answer is to use awswrangler
.
import awswrangler as wr
df = wr.athena.read_sql_query(sql="SELECT * FROM my_athena_table", database="my_database")
Subsitute my_athena_table
and my_database
with your values.
Upvotes: 2