saumya
saumya

Reputation: 23

Transfering files into S3 buckets based on Glue job status

 I am new to **AWS Glue,** and my aim is to extract  transform and load files uploaded in S3 bucket to RDS instance. Also I need to transfer the files into separate S3 buckets based on the Glue Job status (Success /Failure). There will be more than one file uploaded into the initial S3 bucket. How can I get the name of the files uploaded so that i can transfer those files to appropriate buckets.

Step 1: Upload files to S3 bucket1. Step 2: Trigger lamda function to call Job1 Step 3: On success of job1 transfer file to S3 bucket2 Step 4: On failure transfer to another S3 bucket

Upvotes: 0

Views: 2766

Answers (1)

Yuva
Yuva

Reputation: 3153

Have a lambda event trigger listening to the folder you are uploading the files to S3 In the lambda, use AWS Glue API to run the glue job (essentially a python script in AWS Glue).

In Glue python script, use the appropriate library, such as pymysql, etc. as an external library packaged with your python script.

Perform data load operations from S3 to your RDS tables. If you are using Aurora Mysql, then AWS has provided a nice feature "load from S3", so you can directly load the file into the tables (you may need to do some configurations in the PARAMETER GROUP / IAM Roles).

Lambda script to call glue job:

s3 = boto3.client('s3')
glue = boto3.client('glue')

def lambda_handler(event, context):
    gluejobname="<YOUR GLUE JOB NAME>"

    try:
        runId = glue.start_job_run(JobName=gluejobname)
        status = glue.get_job_run(JobName=gluejobname, RunId=runId['JobRunId'])
        print("Job Status : ", status['JobRun']['JobRunState'])
    except Exception as e:
        print(e)
    raise e

Glue Script:

import mysql.connector
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.context import DynamicFrame
from awsglue.transforms import *
from pyspark.sql.types import StringType
from pyspark.sql.types import DateType
from pyspark.sql.functions import unix_timestamp, from_unixtime
from pyspark.sql import SQLContext

# Create a Glue context
glueContext = GlueContext(SparkContext.getOrCreate())

url="<RDS URL>"
uname="<USER NAME>"
pwd="<PASSWORD>"
dbase="DBNAME"


def connect():
    conn = mysql.connector.connect(host=url, user=uname, password=pwd, database=dbase)
    cur = conn.cursor()
    return cur, conn

def create_stg_table():
    cur, conn = connect()
    createStgTable1 = <CREATE STAGING TABLE IF REQUIRED>

    loadQry = "LOAD DATA FROM S3 PREFIX 'S3://PATH FOR YOUR CSV' REPLACE INTO TABLE <DB.TABLENAME> FIELDS TERMINATED BY '|' LINES TERMINATED BY '\n' IGNORE 1 LINES (@var1, @var2, @var3, @var4, @var5, @var6, @var7, @var8) SET ......;"
    cur.execute(createStgTable1)
    cur.execute(loadQry)
    conn.commit()
    conn.close()

You can then create a cloudwatch alert wherein check for the glue job status, and depending upon the status, perform file copy operations between S3. We have similar setup in our production.

Regards

Yuva

Upvotes: 1

Related Questions