Reputation: 151
I have a small pipeline im trying to execute:
I've created a Dataflow job through Dataprep as it has nice UI to do all my transformations before writing to a BigQuery table (writing to BigQuery works fine), and the Cloud function triggers when a file is uploaded to the GCS bucket. However the Cloud Function doesn't trigger the Dataflow job (which I wrote in Dataprep).
Please, have a look at my sample code below of my Cloud Function, if I can get any pointers as to why the Dataflow job is not triggering.
/**
* Triggered from a message on a Cloud Storage bucket.
*
* @param {!Object} event The Cloud Functions event.
* @param {!Function} The callback function.
*/
exports.processFile = (event, callback) => {
console.log('Processing file: ' + event.data.name);
callback();
const google = require('googleapis');
exports.CF_GCStoDataFlow_v2 = function(event, callback) {
const file = event.data;
if (file.resourceState === 'exists' && file.name) {
google.auth.getApplicationDefault(function (err, authClient, projectId) {
if (err) {
throw err;
}
if (authClient.createScopedRequired && authClient.createScopedRequired()) {
authClient = authClient.createScoped([
'https://www.googleapis.com/auth/cloud-platform',
'https://www.googleapis.com/auth/userinfo.email'
]);
}
const dataflow = google.dataflow({ version: 'v1b3', auth: authClient });
dataflow.projects.templates.create({
projectId: projectId,
resource: {
parameters: {
inputFile: `gs://${file.bucket}/${file.name}`,
outputFile: `gs://${file.bucket}/${file.name}`
},
jobName: 'cloud-dataprep-csvtobq-v2-281345',
gcsPath: 'gs://mygcstest-pipeline-staging/temp/'
}
}, function(err, response) {
if (err) {
console.error("problem running dataflow template, error was: ", err);
}
console.log("Dataflow template response: ", response);
callback();
});
});
}
};
};
Upvotes: 1
Views: 2876
Reputation: 56
This snippet may help, it uses a different method of the dataflow api (launch), it worked for me, be aware you need to specify template's url and also check the metadata file (you can find it in the same directory as the template when executed through the dataprep interface) file you are including the right parameters
dataflow.projects.templates.launch({
projectId: projectId,
location: location,
gcsPath: jobTemplateUrl,
resource: {
parameters: {
inputLocations : `{"location1" :"gs://${file.bucket}/${file.name}"}`,
outputLocations: `{"location1" : "gs://${destination.bucket}/${destination.name}"}"}`,
},
environment: {
tempLocation: `gs://${destination.bucket}/${destination.tempFolder}`,
zone: "us-central1-f"
},
jobName: 'my-job-name',
}
}
Upvotes: 2
Reputation: 964
Looks like you are putting CF_GCStoDataFlow_v2
inside processFile
, so the Dataflow part of the code is not executing.
Your function should look like this:
/**
* Triggered from a message on a Cloud Storage bucket.
*
* @param {!Object} event The Cloud Functions event.
* @param {!Function} The callback function.
*/
exports.CF_GCStoDataFlow_v2 = (event, callback) => {
const google = require('googleapis');
if (file.resourceState === 'exists' && file.name) {
google.auth.getApplicationDefault(function (err, authClient, projectId) {
if (err) {
throw err;
}
if (authClient.createScopedRequired && authClient.createScopedRequired()) {
authClient = authClient.createScoped([
'https://www.googleapis.com/auth/cloud-platform',
'https://www.googleapis.com/auth/userinfo.email'
]);
}
const dataflow = google.dataflow({ version: 'v1b3', auth: authClient });
dataflow.projects.templates.create({
projectId: projectId,
resource: {
parameters: {
inputFile: `gs://${file.bucket}/${file.name}`,
outputFile: `gs://${file.bucket}/${file.name}`
},
jobName: '<JOB_NAME>',
gcsPath: '<BUCKET_NAME>'
}
}, function(err, response) {
if (err) {
console.error("problem running dataflow template, error was: ", err);
}
console.log("Dataflow template response: ", response);
callback();
});
});
}
callback();
};
Make sure you change the value under “Function to execute” to CF_GCStoDataFlow_v2
Upvotes: 1
Reputation: 47
Have you submitted you Dataproc job? Has it started running? The below documentation can give some idea to get started!
https://cloud.google.com/dataproc/docs/concepts/jobs/life-of-a-job
Upvotes: 1