Carlos P Ceballos
Carlos P Ceballos

Reputation: 422

boto3 check if Athena database exists

Im making a script that creates a database in AWS Athena and then creates tables for that database, today the DB creation was taking ages, so the tables being created referred to a db that doesn't exists, is there a way to check if a DB is already created in Athena using boto3?

This is the part that created the db:

client = boto3.client('athena')
client.start_query_execution(
    QueryString='create database {}'.format('db_name'),
    ResultConfiguration=config
)

Upvotes: 1

Views: 5393

Answers (3)

Vincent Claes
Vincent Claes

Reputation: 4768

use AWS pandas sdk or awswrangler

Installation: pip install awswrangler

Usage:

import awswrangler as wr

assert "some_db" in wr.catalog.databases().values

more info here: https://aws-sdk-pandas.readthedocs.io/en/3.5.0/tutorials/006%20-%20Amazon%20Athena.html#Checking/Creating-Glue-Catalog-Databases

Upvotes: 0

yardstick17
yardstick17

Reputation: 4592

# -*- coding: utf-8 -*-
import logging
import os
from time import sleep

import boto3
import pandas as pd
from backports.tempfile import TemporaryDirectory

logger = logging.getLogger(__name__)


class AthenaQueryFailed(Exception):
    pass


class Athena(object):
    S3_TEMP_BUCKET = "please-replace-with-your-bucket"

    def __init__(self, bucket=S3_TEMP_BUCKET):
        self.bucket = bucket
        self.client = boto3.Session().client("athena")


    def execute_query_in_athena(self, query, output_s3_directory, database="csv_dumps"):
        """ Useful when client executes a query in Athena and want result in the given `s3_directory`
        :param query: Query to be executed in Athena
        :param output_s3_directory: s3 path in which client want results to be stored
        :return: s3 path
        """
        response = self.client.start_query_execution(
            QueryString=query,
            QueryExecutionContext={"Database": database},
            ResultConfiguration={"OutputLocation": output_s3_directory},
        )
        query_execution_id = response["QueryExecutionId"]
        filename = "{filename}.csv".format(filename=response["QueryExecutionId"])
        s3_result_path = os.path.join(output_s3_directory, filename)
        logger.info(
            "Query query_execution_id <<{query_execution_id}>>, result_s3path <<{s3path}>>".format(
                query_execution_id=query_execution_id, s3path=s3_result_path
            )
        )
        self.wait_for_query_to_complete(query_execution_id)
        return s3_result_path

    def wait_for_query_to_complete(self, query_execution_id):
        is_query_running = True
        backoff_time = 10
        while is_query_running:
            response = self.__get_query_status_response(query_execution_id)
            status = response["QueryExecution"]["Status"][
                "State"
            ]  # possible responses: QUEUED | RUNNING | SUCCEEDED | FAILED | CANCELLED
            if status == "SUCCEEDED":
                is_query_running = False
            elif status in ["CANCELED", "FAILED"]:
                raise AthenaQueryFailed(status)
            elif status in ["QUEUED", "RUNNING"]:
                logger.info("Backing off for {} seconds.".format(backoff_time))
                sleep(backoff_time)
            else:
                raise AthenaQueryFailed(status)

    def __get_query_status_response(self, query_execution_id):
        response = self.client.get_query_execution(QueryExecutionId=query_execution_id)
        return response

As pointed in above answer, Athena Waiter is still not there implemented.

I use this light weighted Athena client to do the query, it returns the s3 path of result when the query is completed.

Upvotes: 2

helloV
helloV

Reputation: 52433

The waiter functions for Athena are not implemented yet: Athena Waiter

See: Support AWS Athena waiter feature for a possible workaround until it is implemented in Boto3. This is how it is implemented in AWS CLI.

while True:
    stats = self.athena.get_query_execution(execution_id)
    status = stats['QueryExecution']['Status']['State']
    if status in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
        break
    time.sleep(0.2)

Upvotes: 1

Related Questions