Luthor_
Luthor_

Reputation: 79

Create dataframe using AWS athena query and Boto3

I'm using AWS Athena to query raw data from S3. Since Athena writes the query output into S3 output bucket I am using Lambda function to get the data which is result of athena query into dataframe:

My code:

def athena_query_to_dataframe(db, s3Bucket, query):
    
    import boto3
    import pandas as pd
    
    client = boto3.client('athena')
    listOfStatus = ['SUCCEEDED', 'FAILED', 'CANCELLED']
    listOfInitialStatus = ['RUNNING', 'QUEUED']
    
    print('Starting Query Execution:')
    
    tempS3Path = 's3://{}'.format(s3Bucket)
    
    response = client.start_query_execution(
        QueryString = query,
        QueryExecutionContext = {
            'Database': db
        },
        ResultConfiguration = {
            'OutputLocation': tempS3Path,
        }
    )

    queryExecutionId = response['QueryExecutionId']

    status = client.get_query_execution(QueryExecutionId = queryExecutionId)['QueryExecution']['Status']['State']

    while status in listOfInitialStatus:
        status = client.get_query_execution(QueryExecutionId = queryExecutionId)['QueryExecution']['Status']['State']
        if status in listOfStatus:
            if status == 'SUCCEEDED':
                print('Query Succeeded!')
                paginator = client.get_paginator('get_query_results')
                query_results = paginator.paginate(
                    QueryExecutionId = queryExecutionId,
                    PaginationConfig = {'PageSize': 1000}
                )
            elif status == 'FAILED':
                print('Query Failed!')
            elif status == 'CANCELLED':
                print('Query Cancelled!')
            break
    
    results = []
    rows = []
    
    print('Processing Response')
    
    for page in query_results:
        for row in page['ResultSet']['Rows']:
            rows.append(row['Data'])

    columns = rows[0]
    rows = rows[1:]

    columns_list = []
    for column in columns:
        columns_list.append(column['VarCharValue'])
        
    print('Creating Dataframe')

    dataframe = pd.DataFrame(columns = columns_list)

    for row in rows:
        df_row = []
        try:
            for data in row:
                df_row.append(data['VarCharValue'])
            dataframe.loc[len(dataframe)] = df_row
        except:
            pass

When I am trying to return the df.shape I am only getting (0,20) which means df is not getting updated by the rows.

I am looking for below output:

  1. Fix the above issue to get the rows populated as well.
  2. If there is any better approach to get the dataframe

Upvotes: 1

Views: 3217

Answers (2)

SKY
SKY

Reputation: 177

You can also use pandas built-in function pd.read_sql() as awswrangler needs several permissions to execute.

import pandas as pd
df = pd.read_sql("select * from db_name.table_name")

Upvotes: 0

DaveR
DaveR

Reputation: 2358

The simplest answer is to use awswrangler.

import awswrangler as wr  
df = wr.athena.read_sql_query(sql="SELECT * FROM my_athena_table", database="my_database")

Subsitute my_athena_table and my_database with your values.

Upvotes: 2

Related Questions