BenAhm
BenAhm

Reputation: 151

Cloud Function to Trigger DataPrep Dataflow Job

I have a small pipeline im trying to execute:

  1. file placed into GCS Bucket > 2. Cloud Function triggers Dataflow job when file is placed in GCS bucket (not working) > 3. Writes to Big Query table (this part working)

I've created a Dataflow job through Dataprep as it has nice UI to do all my transformations before writing to a BigQuery table (writing to BigQuery works fine), and the Cloud function triggers when a file is uploaded to the GCS bucket. However the Cloud Function doesn't trigger the Dataflow job (which I wrote in Dataprep).

Please, have a look at my sample code below of my Cloud Function, if I can get any pointers as to why the Dataflow job is not triggering.

/**
 * Triggered from a message on a Cloud Storage bucket.
 *
 * @param {!Object} event The Cloud Functions event.
 * @param {!Function} The callback function.
 */
exports.processFile = (event, callback) => {
  console.log('Processing file: ' + event.data.name);
  callback();

  const google = require('googleapis');

 exports.CF_GCStoDataFlow_v2 = function(event, callback) {
  const file = event.data;
  if (file.resourceState === 'exists' && file.name) {
    google.auth.getApplicationDefault(function (err, authClient, projectId) {
      if (err) {
        throw err;
      }

      if (authClient.createScopedRequired && authClient.createScopedRequired()) {
        authClient = authClient.createScoped([
          'https://www.googleapis.com/auth/cloud-platform',
          'https://www.googleapis.com/auth/userinfo.email'
        ]);
      }

      const dataflow = google.dataflow({ version: 'v1b3', auth: authClient });

      dataflow.projects.templates.create({
        projectId: projectId,
        resource: {
          parameters: {
            inputFile: `gs://${file.bucket}/${file.name}`,
            outputFile: `gs://${file.bucket}/${file.name}`
          },
          jobName: 'cloud-dataprep-csvtobq-v2-281345',
          gcsPath: 'gs://mygcstest-pipeline-staging/temp/'
        }
      }, function(err, response) {
        if (err) {
          console.error("problem running dataflow template, error was: ", err);
        }
        console.log("Dataflow template response: ", response);
        callback();
      });

    });
  }
 };
};

DataProc job

Upvotes: 1

Views: 2876

Answers (3)

Ángel Ramos
Ángel Ramos

Reputation: 56

This snippet may help, it uses a different method of the dataflow api (launch), it worked for me, be aware you need to specify template's url and also check the metadata file (you can find it in the same directory as the template when executed through the dataprep interface) file you are including the right parameters

dataflow.projects.templates.launch({
   projectId: projectId,
   location: location,
   gcsPath: jobTemplateUrl,
   resource: {
     parameters: {
       inputLocations : `{"location1" :"gs://${file.bucket}/${file.name}"}`,
       outputLocations: `{"location1" : "gs://${destination.bucket}/${destination.name}"}"}`,
     },
      environment: {
        tempLocation: `gs://${destination.bucket}/${destination.tempFolder}`,
        zone: "us-central1-f"
     },
     jobName: 'my-job-name',

   }
 }

Upvotes: 2

Federico Panunzio
Federico Panunzio

Reputation: 964

Looks like you are putting CF_GCStoDataFlow_v2 inside processFile, so the Dataflow part of the code is not executing.

Your function should look like this:

/**
 * Triggered from a message on a Cloud Storage bucket.
 *
 * @param {!Object} event The Cloud Functions event.
 * @param {!Function} The callback function.
 */
exports.CF_GCStoDataFlow_v2 = (event, callback) => {

  const google = require('googleapis');

  if (file.resourceState === 'exists' && file.name) {
    google.auth.getApplicationDefault(function (err, authClient, projectId) {
      if (err) {
        throw err;
      }

      if (authClient.createScopedRequired && authClient.createScopedRequired()) {
        authClient = authClient.createScoped([
          'https://www.googleapis.com/auth/cloud-platform',
          'https://www.googleapis.com/auth/userinfo.email'
        ]);
      }

      const dataflow = google.dataflow({ version: 'v1b3', auth: authClient });

      dataflow.projects.templates.create({
        projectId: projectId,
        resource: {
          parameters: {
            inputFile: `gs://${file.bucket}/${file.name}`,
            outputFile: `gs://${file.bucket}/${file.name}`
          },
          jobName: '<JOB_NAME>',
          gcsPath: '<BUCKET_NAME>'
        }
      }, function(err, response) {
        if (err) {
          console.error("problem running dataflow template, error was: ", err);
        }
        console.log("Dataflow template response: ", response);
        callback();
      });

    });
  }

  callback();
};

Make sure you change the value under “Function to execute” to CF_GCStoDataFlow_v2

Upvotes: 1

Sammy
Sammy

Reputation: 47

Have you submitted you Dataproc job? Has it started running? The below documentation can give some idea to get started!

https://cloud.google.com/dataproc/docs/concepts/jobs/life-of-a-job

Upvotes: 1

Related Questions