Reputation: 75
This is my first data analytics project and I'm working on a data pipeline on AWS, the pipeline steps should be as follow:
I'm stuck in step 3, in this step, a validation team must filter the valid data and update the invalid data by querying the data from the first S3 bucket, updating the invalid data and copying it to the second S3 bucket.
One consideration is that the old invalid data must remain as is in the first bucket, the updating process must occur while transferring process.
Is there a way to use the UPDATE statement in Athena?
Could we pass the required UPDATE statement in an AWS Glue job?
I tried the following script from this question to update the data:
from pyspark.context import SparkContext
from awsglue.context import GlueContext
spark_session = glueContext.spark_session
sqlContext = SQLContext(spark_session.sparkContext, spark_session)
glueContext = GlueContext(SparkContext.getOrCreate())
dyF = glueContext.create_dynamic_frame.from_catalog(database='{{database}}',table_name='{{table_name}}'
df = dyF.toDF()
df.registerTempTable('{{name}}')
df = sqlContext.sql("{{sql_query_on_above_created_table}}")
df.format('parquet').save('{{s3_location}}')
But I got This Error:
SyntaxError: invalid syntax (Untitled job.py, line 7)
I read about using AWS Data Pipeline with EMR, but I think it's complicated and I can't imagine how it can serve on-demand queries?
What is the best solution to update the data while transferring it between two S3 buckets and keeping the old version as is and putting the new data in a new S3 bucket??
Upvotes: 0
Views: 1482
Reputation: 8493
A couple of options come to mind for Step 3
First, Athena doesn't have Update queries but you could likely use a CTAS query
CREATE TABLE new_table
WITH (
format = 'Parquet',
external_location = 's3://my-other-bucket/'),
write_compression = 'SNAPPY')
AS SELECT *, plusSomeTransformations
FROM existing_table;
or with Glue like you eluded to in your question
from pyspark.context import SparkContext
from awsglue.context import GlueContext
spark_session = glueContext.spark_session
sqlContext = SQLContext(spark_session.sparkContext, spark_session)
glueContext = GlueContext(SparkContext.getOrCreate())
dyF = glueContext.create_dynamic_frame.from_catalog(database='myDB',table_name='existing_table'
df = dyF.toDF()
df.registerTempTable('myTable')
df = sqlContext.sql("SELECT *, plusSomeTransformations FROM myTable")
df.format('parquet').save('s3://my-other-bucket/')
Upvotes: 0