Reputation: 51
I have a few views in AWS Athena that are being accessed in our catalog from another AWS account/team. AWS does not natively support accessing views through a Glue ETL job as I get an "error code 10" when trying to do so. I can access the data just fine in Athena. How can I get around this?
Upvotes: 0
Views: 1696
Reputation: 16
As noted by OP, indeed AWS Glue doesn't have native support for reading data catalog views.
One solution that can be handy is to use Athena to query the view and use it in Glue.
I don't recommend glueContext and get_query_results, due to the limitation on the number of records. Instead, try using awswrangler
:
import awswrangler as wr
df = wr.athena.read_sql_query(sql="SELECT * FROM <table_name_in_Athena>", database="<database_name>")
Be mindfull that this works best if your view is not too large and can be loaded into a pandas dataframe.
Upvotes: 0
Reputation: 11
It's probably too late, but I just want to share my experience, as some time ago I was trying to search for answers to the same question.
You can use extra-jars
argument when starting your ETL Job (in my case it was PySpark ETL Job) and pass the additional jar file:
--extra-jars="s3://your_bucket/path/to/file/AthenaJDBC42_2.0.27.1000.jar"
This file adds more Java (because Java Spark engine) classes to your execution environment and you'll be able to query everything in Athena:
def query_athena(query):
return (
glueContext.read.format("jdbc")
.option("driver", "com.simba.athena.jdbc.Driver")
.option(
"AwsCredentialsProviderClass",
"com.simba.athena.amazonaws.auth.InstanceProfileCredentialsProvider",
)
.option("url", "jdbc:awsathena://athena.us-east-1.amazonaws.com:443")
.option("S3OutputLocation", '<your spill bucket location>')
.option("query", query)
.load()
)
df = query_athena("""
select * from "database"."table"
limit 100
""")
I used AthenaJDBC42_2.0.27.1000.jar
, but there could be newer versions of this driver. You can find official download links in AWS docs
Hope this will save someone's time :)
Upvotes: 1
Reputation: 51
One Method you can do is use Boto3.
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import boto3 #this is not generated and needs to be placed here
from awsglue.dynamicframe import DynamicFrame
print("Glue Job Started") #not needed but will help to make sure the job started when troubleshooting
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext() ##setting SparkContext
glueContext = GlueContext(sc)
spark = glueContext.spark_session
glueContext._jsc.hadoopConfiguration().set("fs.s3.useRequesterPaysHeader","true") ## this is needed for permissions to access someone else's data catalog
spark._jsc.hadoopConfiguration().set("fs.s3.useRequesterPaysHeader","true") ## this is needed for permissions to access someone else's data catalog
#below is for establishing boto3 client connectivity
athena_client = boto3.client('athena') #needed
print("Athena Client Created") #not needed but good for troubleshooting
s3 = boto3.client('s3') #needed
# Below block of code is to delete the current data in your s3 folder as boto3 will bring the data from the views here first and then run your query. If you want to overwrite the previous data, you will need this. Otherwise, it is optional.
s3resource = boto3.resource('s3')
bucket = s3resource.Bucket('[your bucket]) #replace your bucket with your s3 internal link
for obj in bucket.objects.filter(Prefix='ETL/shared/icoms_hp/'): # delete from new path
s3resource.Object(bucket.name,obj.key).delete()
#this is the start of your main query
query = """
UNLOAD (
#write SQL query here
)
TO '[internal S3 path]'. #this is where you want the final data to be populated in
your s3
path
WITH (
format = 'PARQUET', #your format you want to output
compression = 'SNAPPY'
);
"""
#below block will save the temp data during execution
response = athena_client.start_query_execution(
QueryString=query,
ResultConfiguration={
'OutputLocation': '[your temp s3 output folder]'}
)
#below block is for troubleshooting and seeing the query in athena
queryid = response['QueryExecutionId']
print(queryid)
status = athena_client.get_query_execution(QueryExecutionId=queryid).
['QueryExecution']['Status']['State']
while status.upper() in ['QUEUED', 'RUNNING']:
status = athena_client.get_query_execution(QueryExecutionId=queryid).
['QueryExecution']['Status']['State']
print(f"status - {status}")
Upvotes: 2