Maazen
Maazen

Reputation: 107

Step function to invoke glue job and lambda function with passed parameters

Scenario : I want to pass S3 (source file location) and the s3 (output file location) as input parameters in my workflow .

Workflow : Aws Step Function calls -> lambda function and lambda function calls -> the glue job, I want to pass the parameters from step function -> lambda function -> glue job, where glue job does some transformation on the S3 input file and writes its output to S3 output file

Below are step function, lambda function and glue job respectively and the input 
json which is passed to step function as input.

1:Input (Parameters passed) :

{
    "S3InputFileLocation": "s3://bucket_name/sourcefile.csv",
    "S3OutputFileLocation": "s3://bucket_name/FinalOutputfile.csv"
}

2: Step Function/ state machine ( which calls lambda with the above input parameters) :

{
   "StartAt":"AWSStepFunctionInitiator",
   "States":{  
      "AWSStepFunctionInitiator": {  
         "Type":"Task",
         "Resource":"arn:aws:lambda:us-east-1:xxxxxx:function:AWSLambdaFunction",
         "InputPath": "$",
         "End": true
      }
   }
}

3: Lambda Function(I.E AWSLambdaFunction invoked above, which in turn calls AWSGlueJob below):

import json
import boto3

def lambda_handler(event,context):
    client= boto3.client("glue")
    client.start_job_run(
        JobName='AWSGlueJob',
        Arguments={
        'S3InputFileLocation': event["S3InputFileLocation"],
        'S3OutputFileLocation': event["S3OutputFileLocation"]})
    return{
        'statusCode':200,
        'body':json.dumps('AWS lambda function invoked!')
    }

4: AWS Glue Job Script:

import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job

print('AWS Glue Job started')
args = getResolvedOptions(sys.argv, ['AWSGlueJob','S3InputFileLocation', 'S3OutputFileLocation'])
S3InputFileLocation= args['S3InputFileLocation']
S3OutputFileLocation= args['S3OutputFileLocation']

glueContext = GlueContext(SparkContext.getOrCreate())

dfnew = glueContext.create_dynamic_frame_from_options("s3", {'paths': [S3_InputLocation] }, format="csv" )

datasink = glueContext.write_dynamic_frame.from_options(frame = dfnew, connection_type = "s3",connection_options = {"path": S3_OutputLocation}, format = "csv", transformation_ctx ="datasink") 

The above step function and corresponding workflow executes without any compilation or run time error, also I do see parameters successfully passed from Step function to lambda function, but none of my print statements in glue job are getting logged in cloud watch that means there is some issue when lambda function calls the glue job. Kindly help me figure out if there is some issue in the way I am invoking glue from lambda ?

Upvotes: 1

Views: 2227

Answers (1)

XeBoris
XeBoris

Reputation: 1

Hej,

Maybe it is already solved but what helps are those two links:

  1. https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-get-resolved-options.html
  2. https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html

And to give precise answer to your question: Add '--' to the arguments in here:

    import json
    import boto3

    def lambda_handler(event,context):
        client= boto3.client("glue")
        client.start_job_run(
             JobName='AWSGlueJob',
             Arguments={
                '--S3InputFileLocation': event["S3InputFileLocation"],
                '--S3OutputFileLocation': event["S3OutputFileLocation"]})
        return{
         'statusCode':200,
         'body':json.dumps('AWS lambda function invoked!')
        }

Upvotes: 0

Related Questions