RagePwn
RagePwn

Reputation: 421

Unable to read Athena query into pandas dataframe

I have the below code, and want to get it to return a dataframe properly. The polling logic works, but the dataframe doesn't seem to get created/returned. Right now it just returns None when called.

import boto3
import pandas as pd
import io
import re
import time

AK='mykey'
SAK='mysecret'

params = {
    'region': 'us-west-2',
    'database': 'default',
    'bucket': 'my-bucket',
    'path': 'dailyreport',
    'query': 'SELECT * FROM v_daily_report LIMIT 100'
}

session = boto3.Session(aws_access_key_id=AK,aws_secret_access_key=SAK)


# In[32]:


def athena_query(client, params):

    response = client.start_query_execution(
        QueryString=params["query"],
        QueryExecutionContext={
            'Database': params['database']
        },
        ResultConfiguration={
            'OutputLocation': 's3://' + params['bucket'] + '/' + params['path']
        }
    )
    return response


def athena_to_s3(session, params, max_execution = 5):
    client = session.client('athena', region_name=params["region"])
    execution = athena_query(client, params)
    execution_id = execution['QueryExecutionId']
    df = poll_status(execution_id, client)
    return df

def poll_status(_id, client):
    '''
    poll query status
    '''
    result = client.get_query_execution(
        QueryExecutionId = _id
    )

    state = result['QueryExecution']['Status']['State']
    if state == 'SUCCEEDED':
        print(state)
        print(str(result))
        s3_key = 's3://' + params['bucket'] + '/' + params['path']+'/'+ _id + '.csv'
        print(s3_key)
        df = pd.read_csv(s3_key)
        return df
    elif state == 'QUEUED':
        print(state)
        print(str(result))
        time.sleep(1)
        poll_status(_id, client)
    elif state == 'RUNNING':
        print(state)
        print(str(result))
        time.sleep(1)
        poll_status(_id, client)
    elif state == 'FAILED':
        return result
    else:
        print(state)
        raise Exception


df_data = athena_to_s3(session, params)

print(df_data)

I plan to move the dataframe load out of the polling function, but just trying to get it to work as is right now.

Upvotes: 1

Views: 5576

Answers (3)

Guy
Guy

Reputation: 12901

I recommend you to take a look at AWS Wrangler instead of using the traditional boto3 Athena API. This newer and more specific interface to all things data in AWS including queries to Athena and giving more functionality.

import awswrangler as wr

df = wr.pandas.read_sql_query(
    sql="select * from table",
    database="database"
)

Thanks to @RagePwn comment it is worth checking PyAthena as an alternative to the boto3 option to query Athena.

Upvotes: 2

Zephaniah Grunschlag
Zephaniah Grunschlag

Reputation: 1083

Just to elaborate on the RagePwn's answer of using PyAthena -that's what I ultimately did as well. For some reason AwsWrangler choked on me and couldn't handle the JSON that was being returned from S3. Here's the code snippet that worked for me based on PyAthena's PyPi page

import os
from pyathena import connect
from pyathena.util import as_pandas


aws_access_key_id = os.getenv('ATHENA_ACCESS_KEY')
aws_secret_access_key = os.getenv('ATHENA_SECRET_KEY')
region_name = os.getenv('ATHENA_REGION_NAME')
staging_bucket_dir = os.getenv('ATHENA_STAGING_BUCKET')

cursor = connect(aws_access_key_id=aws_access_key_id,
                 aws_secret_access_key=aws_secret_access_key,
                 region_name=region_name,
                 s3_staging_dir=staging_bucket_dir,
                ).cursor()
cursor.execute(sql)
df = as_pandas(cursor)

The above assumes you have defined as environment variables the following:

  • ATHENA_ACCESS_KEY: the AWS access key id for your AWS account
  • ATHENA_SECRET_KEY: the AWS secret key
  • ATHENA_REGION_NAME: the AWS region name
  • ATHENA_STAGING_BUCKET: a bucket in the same account that has the correct access settings (explanation of which is outside the scope of this answer)

Upvotes: 1

Eric Truett
Eric Truett

Reputation: 3010

If it is returning None, then it is because state == 'FAILED'. You need to investigate the reason it failed, which may be in 'StateChangeReason'.

{
    'QueryExecution': {
        'QueryExecutionId': 'string',
        'Query': 'string',
        'StatementType': 'DDL'|'DML'|'UTILITY',
        'ResultConfiguration': {
            'OutputLocation': 'string',
            'EncryptionConfiguration': {
                'EncryptionOption': 'SSE_S3'|'SSE_KMS'|'CSE_KMS',
                'KmsKey': 'string'
            }
        },
        'QueryExecutionContext': {
            'Database': 'string'
        },
        'Status': {
            'State': 'QUEUED'|'RUNNING'|'SUCCEEDED'|'FAILED'|'CANCELLED',
            'StateChangeReason': 'string',
            'SubmissionDateTime': datetime(2015, 1, 1),
            'CompletionDateTime': datetime(2015, 1, 1)
        },
        'Statistics': {
            'EngineExecutionTimeInMillis': 123,
            'DataScannedInBytes': 123,
            'DataManifestLocation': 'string',
            'TotalExecutionTimeInMillis': 123,
            'QueryQueueTimeInMillis': 123,
            'QueryPlanningTimeInMillis': 123,
            'ServiceProcessingTimeInMillis': 123
        },
        'WorkGroup': 'string'
    }
}

Upvotes: 0

Related Questions