Reputation: 1291
I would like to create a data pipeline and that would be involked by lambda function. Data pipeline is "Load s3 data into RDS MYSQL ", Build using a template provided by AWS itself.
From my lambda function, I'm not able to define the parameters to send to my data pipeline. I wanted to send the below parameters to the data pipeline from lambda,
"myRDSInstanceId": "source-dev",
"myRDSUsername": "username",
"myRDSTableInsertSql": "INSERT INTO employee(id,name,salary) VALUES(?,?,?,)",
"*myRDSPassword": "https://www.ec2instances.info/?filter=m3",
"myInputS3Loc": "s3://services/employee/",
"myRDSTableName": "employee"
How's this possible?? Any help The python code for lambda and my pipline definition is also given below.
from __future__ import print_function
import json
import urllib
import boto3
def lambda_handler(event, context):
client = boto3.client('datapipeline')
print('Loading function here')
client.activate_pipeline(
pipelineId='df-095524176JKK0DOHDDDC',
parameterValues=[{'id':'myRDSTableName','stringValue':'employee'}])
return {
'statusCode': 200,
'body': json.dumps('Hello from Lambda!')
}
Pipeline Definition
{
"objects": [
{
"output": {
"ref": "DestinationRDSTable"
},
"input": {
"ref": "S3InputDataLocation"
},
"dependsOn": {
"ref": "RdsMySqlTableCreateActivity"
},
"name": "DataLoadActivity",
"id": "DataLoadActivity",
"runsOn": {
"ref": "Ec2Instance"
},
"type": "CopyActivity"
},
{
"subnetId": "subnet-XXXXX",
"instanceType": "m1.medium",
"name": "Ec2Instance",
"actionOnTaskFailure": "terminate",
"securityGroups": "#{myEc2RdsSecurityGrps}",
"id": "Ec2Instance",
"type": "Ec2Resource",
"terminateAfter": "1 Hours"
"terminateAfter": "1 Hours"
},
{
"database": {
"ref": "rds_mysql"
},
"name": "RdsMySqlTableCreateActivity",
"runsOn": {
"ref": "Ec2Instance"
},
"id": "RdsMySqlTableCreateActivity",
"type": "SqlActivity",
"script": "#{myRDSCreateTableSql}"
},
{
"*password": "password",
"name": "rds_mysql",
"id": "rds_mysql",
"type": "RdsDatabase",
"rdsInstanceId": "#{myRDSInstanceId}",
"username": "#{myRDSUsername}"
},
{
"name": "DataFormat1",
"columnSeparator": "|",
"id": "DataFormat1",
"type": "TSV",
"recordSeparator": "\\n"
},
{
"failureAndRerunMode": "CASCADE",
"resourceRole": "DataPipelineDefaultResourceRole",
"role": "DataPipelineDefaultRole",
"pipelineLogUri": "s3://logs/",
"scheduleType": "ONDEMAND",
"name": "Default",
"id": "Default"
},
{
"database": {
"ref": "rds_mysql"
},
"name": "DestinationRDSTable",
"insertQuery": "#{myRDSTableInsertSql}",
"id": "DestinationRDSTable",
"type": "SqlDataNode",
"table": "#{myRDSTableName}",
"selectQuery": "select * from #{table}"
},
{
"directoryPath": "#{myInputS3Loc}",
"dataFormat": {
"ref": "DataFormat1"
},
"name": "S3InputDataLocation",
"id": "S3InputDataLocation",
"type": "S3DataNode"
}
],
"parameters": [
{
"description": "RDS MySQL password",
"id": "*myRDSPassword",
"type": "String"
},
{
"watermark": "security group name",
"helpText": "The names of one or more EC2 security groups that have access to the RDS MySQL cluster.",
"description": "RDS MySQL security group(s)",
"isArray": "true",
"optional": "true",
"id": "myEc2RdsSecurityGrps",
"type": "String"
},
{
"description": "RDS MySQL username",
"id": "myRDSUsername",
"type": "String"
},
{
"description": "Input S3 file path",
"id": "myInputS3Loc",
"type": "AWS::S3::ObjectKey"
},
{
"helpText": "The SQL statement to insert data into the RDS MySQL table.",
"watermark": "INSERT INTO #{table} (col1, col2, col3) VALUES(?, ?, ?) ;",
"description": "Insert SQL query",
"id": "myRDSTableInsertSql",
"type": "String"
},
{
"helpText": "The name of an existing table or a new table that will be created based on the create table SQL query parameter below.",
"description": "RDS MySQL table name",
"id": "myRDSTableName",
"type": "String"
},
{
"watermark": "DB Instance",
"description": "RDS Instance ID",
"id": "myRDSInstanceId",
"type": "String"
}
],
"values": {
"myRDSInstanceId": "source-dev",
"myRDSUsername": "username",
"myRDSTableInsertSql": "INSERT INTO employee(id,name,salary) VALUES(?,?,?,)",
"*myRDSPassword": "https://www.ec2instances.info/?filter=m3",
"myInputS3Loc": "s3://services/employee/",
"myRDSTableName": "employee"
}
}
Upvotes: 0
Views: 1269
Reputation: 11
In the Lambda code, you have provided value only for parameter myRDSTableName
. When you run the Lambda function, it will only pass the value for myRDSTableName
and other parameters will be blank.
You need to pass either the values of other parameters, which are necessary for running the pipeline, in the Lambda function or set default parameter values in pipeline definition(in the parameter objects section).
Upvotes: 1
Reputation: 13581
The parameters for activate_pipeline
is given by a list input, so
client.activate_pipeline(
pipelineId='df-095524176JKK0DOHDDDC',
parameterValues=[
{
'id':'myRDSTableName',
'stringValue':'employee'
},
{
'id':'blah',
'stringValue':'blah'
},
...
]
)
add your parameter values repeatedly. You can check in more detailes from the boto3 documentation.
Upvotes: 0