cpljp
cpljp

Reputation: 51

How to query Views in Athena in a AWS Glue ETL job

I have a few views in AWS Athena that are being accessed in our catalog from another AWS account/team. AWS does not natively support accessing views through a Glue ETL job as I get an "error code 10" when trying to do so. I can access the data just fine in Athena. How can I get around this?

Upvotes: 0

Views: 1696

Answers (3)

Diego Lottermann
Diego Lottermann

Reputation: 16

As noted by OP, indeed AWS Glue doesn't have native support for reading data catalog views.

One solution that can be handy is to use Athena to query the view and use it in Glue.

I don't recommend glueContext and get_query_results, due to the limitation on the number of records. Instead, try using awswrangler:

import awswrangler as wr  
df = wr.athena.read_sql_query(sql="SELECT * FROM <table_name_in_Athena>", database="<database_name>")

Be mindfull that this works best if your view is not too large and can be loaded into a pandas dataframe.

Upvotes: 0

Adrian Vorobel
Adrian Vorobel

Reputation: 11

It's probably too late, but I just want to share my experience, as some time ago I was trying to search for answers to the same question. You can use extra-jars argument when starting your ETL Job (in my case it was PySpark ETL Job) and pass the additional jar file:

--extra-jars="s3://your_bucket/path/to/file/AthenaJDBC42_2.0.27.1000.jar"

This file adds more Java (because Java Spark engine) classes to your execution environment and you'll be able to query everything in Athena:

def query_athena(query):
    return (
        glueContext.read.format("jdbc")
        .option("driver", "com.simba.athena.jdbc.Driver")
        .option(
            "AwsCredentialsProviderClass",
            "com.simba.athena.amazonaws.auth.InstanceProfileCredentialsProvider",
        )
        .option("url", "jdbc:awsathena://athena.us-east-1.amazonaws.com:443")
        .option("S3OutputLocation", '<your spill bucket location>')
        .option("query", query)
        .load()
    )

df = query_athena("""
    select * from "database"."table" 
    limit 100
""")

I used AthenaJDBC42_2.0.27.1000.jar, but there could be newer versions of this driver. You can find official download links in AWS docs

Hope this will save someone's time :)

Upvotes: 1

cpljp
cpljp

Reputation: 51

One Method you can do is use Boto3.

   import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import boto3 #this is not generated and needs to be placed here
from awsglue.dynamicframe import DynamicFrame

print("Glue Job Started") #not needed but will help to make sure the job started when troubleshooting
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext() ##setting SparkContext
glueContext = GlueContext(sc)
spark = glueContext.spark_session

glueContext._jsc.hadoopConfiguration().set("fs.s3.useRequesterPaysHeader","true") ## this is needed for permissions to access someone else's data catalog
spark._jsc.hadoopConfiguration().set("fs.s3.useRequesterPaysHeader","true") ## this is needed for permissions to access someone else's data catalog

#below is for establishing boto3 client connectivity
athena_client = boto3.client('athena') #needed
print("Athena Client Created") #not needed but good for troubleshooting
s3 = boto3.client('s3') #needed

# Below block of code is to delete the current data in your s3 folder as boto3 will bring the data from the views here first and then run your query. If you want to overwrite the previous data, you will need this. Otherwise, it is optional.
s3resource = boto3.resource('s3')
bucket = s3resource.Bucket('[your bucket]) #replace your bucket with your s3 internal link
for obj in bucket.objects.filter(Prefix='ETL/shared/icoms_hp/'): # delete from new path
s3resource.Object(bucket.name,obj.key).delete()

#this is the start of your main query
query = """
UNLOAD (
#write SQL query here
)
TO '[internal S3 path]'. #this is where you want the final data to be populated in 
your s3 
path
 WITH (
format = 'PARQUET', #your format you want to output
compression = 'SNAPPY'
);
"""
 #below block will save the temp data during execution
 response = athena_client.start_query_execution(
 QueryString=query,
 ResultConfiguration={
 'OutputLocation': '[your temp s3 output folder]'}
 )

#below block is for troubleshooting and seeing the query in athena
queryid = response['QueryExecutionId']
print(queryid)
status = athena_client.get_query_execution(QueryExecutionId=queryid). 
['QueryExecution']['Status']['State']
while status.upper() in ['QUEUED', 'RUNNING']:
status = athena_client.get_query_execution(QueryExecutionId=queryid). 
['QueryExecution']['Status']['State']
print(f"status - {status}")

Upvotes: 2

Related Questions