Reputation: 709
I am relatively new to AWS so I am not sure how to go about doing this,
I have CSV files on s3 and I have already set up the Aurora instance on RDS. The thing that I am unable to figure out is how do I automate the bulk loading of data, essentially doing like a LOAD DATA FROM s3
kind of thing using something like AWS Glue.
I also used the Glue native thing of s3 to RDS, but then it is essentially a bunch of inserts into RDS over a JDBC connection which is also super slow for large datasets.
I can do it independently running the command on RDS but I do not want to do that and want to leverage Glue. I also looked at using a MySQL connector for Python but Glue natively only supports Python 2.7 which is something that I do not want to use.
Any help would be greatly appreciated.
Upvotes: 2
Views: 7247
Reputation: 3153
The approach is as stated above, have an S3 event trigger and a lambda job listening on the s3 bucket/object location. As soon as a file is uploaded to the s3 location, the lambda job will run, and in the lambda, you can configure to call an AWS Glue job. This is exactly we have done and has gone successfully live. Lambda has a 15minute life, and it should take less an a minute to trigger/start a Glue job.
Please find herewith a sample source for reference.
from __future__ import print_function
import json
import boto3
import time
import urllib
print('Loading function')
s3 = boto3.client('s3')
glue = boto3.client('glue')
def lambda_handler(event, context):
gluejobname="your-glue-job-name here"
try:
runId = glue.start_job_run(JobName=gluejobname)
status = glue.get_job_run(JobName=gluejobname, RunId=runId['JobRunId'])
print("Job Status : ", status['JobRun']['JobRunState'])
except Exception as e:
print(e)
print('Error getting object {} from bucket {}. Make sure they exist '
'and your bucket is in the same region as this '
'function.'.format(source_bucket, source_bucket))
raise e
For creating a Lambda function, go to AWS Lambdra->Create a new function from Scratch->Select S3 for event, and then configure the S3 bucket locations, prefixes as required. Then copy paste the above code sample, inline code area, and configure the glue job name as needed. Please ensure you have all required IAM roles/access setup.
The glue job should have provision to connect to your Aurora, and then you can use "LOAD FROM S3....." command provided by Aurora. Make sure all parameter group settings/configurations are done as needed.
Let me know if any issues.
UPDATE: SAMPLE code snippet for LOAD FROM S3:
conn = mysql.connector.connect(host=url, user=uname, password=pwd, database=dbase)
cur = conn.cursor()
cur, conn = connect()
createStgTable1 = "DROP TABLE IF EXISTS mydb.STG_TABLE;"
createStgTable2 = "CREATE TABLE mydb.STG_TABLE(COL1 VARCHAR(50) NOT NULL, COL2 VARCHAR(50), COL3 VARCHAR(50), COL4 CHAR(1) NOT NULL);"
loadQry = "LOAD DATA FROM S3 PREFIX 's3://<bucketname>/folder' REPLACE INTO TABLE mydb.STG_TABLE FIELDS TERMINATED BY '|' LINES TERMINATED BY '\n' IGNORE 1 LINES (@var1, @var2, @var3, @var4) SET col1= @var1, col2= @var2, col3= @var3, col4=@var4;"
cur.execute(createStgTable1)
cur.execute(createStgTable2)
cur.execute(loadQry)
conn.commit()
conn.close()
Upvotes: 5