Saisumanth Gopisetty
Saisumanth Gopisetty

Reputation: 946

How do I Configure file format of AWS Athena results

Currently, the Athena query results are in tsv format in S3. Is there any way to configure Athena queries to return results in Parquet format.

Upvotes: 10

Views: 7515

Answers (2)

Ehsan Fathi
Ehsan Fathi

Reputation: 708

Apparently now it's possible to do it. Take a look at this link.

UNLOAD (SELECT * FROM old_table) 
TO 's3://DOC-EXAMPLE-BUCKET/' 
WITH (format = 'PARQUET',compression = 'SNAPPY')

in case it makes it easier use a f-string:

main_query = 'SELECT * FROM old_table'
s3_address = 's3://DOC-EXAMPLE-BUCKET/' 
database = 'default' # please put your own database

query = f"""UNLOAD ({main_query}) 
TO {s3_address} 
WITH (format = 'PARQUET',compression = 'SNAPPY')"""

Please pay attention that:

The TO destination must specify a location in Amazon S3 that has no data. Before the UNLOAD query writes to the location specified, it verifies that the bucket location is empty. Because UNLOAD does not write data to the specified location if the location already has data in it, UNLOAD does not overwrite existing data. To reuse a bucket location as a destination for UNLOAD, delete the data in the bucket location, and then run the query again.

so make sure the destination is empty before you run the query.

# Execute the query
response = client.start_query_execution(
    QueryString=query,
    QueryExecutionContext={'Database': DATABASE},
    ResultConfiguration={'OutputLocation': s3_address}
)

you also can get the QueryExecutionId this way:

    # Get the query execution ID
    query_execution_id = response['QueryExecutionId']

The QueryExecutionId is useful because Athena will save the CSV and metadata files with that name so you can use it to delete the extra files and clean the clutter.

Also to check if your query has been finished you can use something like this:

import time
def is_query_still_running(query_execution_id):
    response = client.get_query_execution(QueryExecutionId=query_execution_id)
    return response['QueryExecution']['Status']['State'] in ['QUEUED', 'RUNNING']


while is_query_still_running(query_execution_id):
    time.sleep(1)  # Wait for 1 seconds
print('Done!')

I don't know how to retrieve the name of the parquet file. of if there is a pattern to it. If anyone knows how to do that please leave a comment :)

Here is the whole shebang for convenience:

import boto3
import pandas as pd
import time


def list_s3_objects(s3, bucket, prefix):
    import itertools
    results = []
    # Start with empty continuation token
    continuation_token = None
    # Loop to paginate through the results
    while True:
        # Check if continuation token is present
        if continuation_token:
            response = s3.list_objects_v2(Bucket=bucket, Prefix=prefix, ContinuationToken=continuation_token)
        else:
            response = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)
    
        # Process the current batch of objects
        if 'Contents' in response:
            results.append([obj['Key'] for obj in response.get("Contents", [])])

        # Check if more objects are available
        if response['IsTruncated']:
            # If so, prepare for the next iteration with the new token
            continuation_token = response['NextContinuationToken']
        else:
            # No more objects, exit loop
            break
    return list(itertools.chain.from_iterable(results))

def delete_files_in_s3_folder(bucket_name, folder_path, placeholder_file=None):
    """function to delete all the files in an s3 folder. placeholder_file won't be deleted """
    s3 = boto3.resource('s3')
    bucket = s3.Bucket(bucket_name)
    folder_path = folder_path if folder_path.endswith('/') else folder_path + '/'

    # List and delete objects in the folder
    for obj in bucket.objects.filter(Prefix=folder_path):
        if placeholder_file is None or obj.key != folder_path + placeholder_file:
            obj.delete()
            print(f"Deleted: {obj.key}")

def is_query_still_running(client, query_execution_id):
    """Function to check if the query is still running"""
    response = client.get_query_execution(QueryExecutionId=query_execution_id)
    if response['QueryExecution']['Status']['State'] in ["FAILED", "CANCELLED"]:
        raise RuntimeError(f"Query was {esponse['QueryExecution']['Status']['State']}")
        
    return response['QueryExecution']['Status']['State'] in ['QUEUED', 'RUNNING']

def load_parquet_dir(s3, bucket, prefix):
    # Read each Parquet file into a pandas DataFrame and append it to a list
    dataframes = []
    for file in list_s3_objects(s3, bucket, prefix):
        df = pd.read_parquet(f"s3://{bucket}/{file}")
        dataframes.append(df)

    if dataframes:
        # Concatenate all the dataframes in the list
        merged_df = pd.concat(dataframes, ignore_index=True)

        return merged_df
    return None

def query_athena(bucket, prefix, sql_query, output_path, region, database, delete=False):
    """This function queries athena and saves the results in parquet format on s3 and deletes the default CSV and metadata files"""
    # Define the query
    query = f"""UNLOAD ({sql_query})
    TO '{output_path}'
    WITH (format = 'PARQUET',compression = 'SNAPPY')"""
    
    client = boto3.client('athena', region_name=region)
    s3 = boto3.client('s3')
    if len(list_s3_objects(s3, bucket=bucket, prefix=prefix))>0 and not delete:
        
        raise RuntimeError(f"Directory '{prefix}' is not empty. There are files in there {list_s3_objects(s3, bucket=bucket, prefix=prefix)}. It has to be an empty folder for the UNLOAD function to work.\nThe TO destination must specify a location in Amazon S3 that has no data. Before the UNLOAD query writes to the location specified, it verifies that the bucket location is empty. Because UNLOAD does not write data to the specified location if the location already has data in it, UNLOAD does not overwrite existing data. To reuse a bucket location as a destination for UNLOAD, delete the data in the bucket location, and then run the query again. https://docs.aws.amazon.com/athena/latest/ug/unload.html\n{query}")
    
    else:
        if delete:
            delete_files_in_s3_folder(bucket, prefix)
        # Execute the query
        response = client.start_query_execution(
            QueryString=query,
            QueryExecutionContext={
                'Database': database
            },
            ResultConfiguration={
                'OutputLocation': output_path
            }
        )
    
        # Get the query execution ID
        query_execution_id = response['QueryExecutionId']
    
        # Polling Athena to check if the query is completed
        while is_query_still_running(client, query_execution_id):
            time.sleep(1)  # Wait for 1 seconds
        print('Query finished!')
    
        # List objects within the bucket
        files = list_s3_objects(s3, bucket=bucket, prefix=prefix)
        print('deleting extra files to just keep the parquet files')
        for file in files:
            if query_execution_id in file:
                print(f'deleting {file}')
                s3.delete_object(Bucket=bucket, Key=file)
        return load_parquet_dir(s3, bucket, prefix)

Upvotes: 1

jbgorski
jbgorski

Reputation: 1939

Answer

At this moment it isn't possible to do it directly with Athena. When it comes to configure result of the Athena query you can only setup query result location and encryption configuration.

Workaround

1) From October Athena supports CTAS query, you can try to use this feature.

https://docs.aws.amazon.com/athena/latest/ug/ctas.html

https://docs.aws.amazon.com/athena/latest/ug/ctas-examples.html

I think example 4 or 5 may interest you.

2) You can create Python script and use Boto3 framework. Here is the example, you can adapt this script to your requirements and add saving query results to the parquet format.

https://gist.github.com/schledererj/b2e2a800998d61af2bbdd1cd50e08b76

3) If you want to transform your data you should use rather such services as AWS EMR or AWS Glue.

Upvotes: 11

Related Questions