Anjana
Anjana

Reputation: 31

How to call google dataproc job from google cloud function

Trigger a cloud function whenever a new file is uploaded to cloud storage bucket. This function should call a dataproc job written in pyspark to read the file and load it to BigQuery.

I want to know how to call a google dataproc job from cloud function. Please suggest.

Upvotes: 3

Views: 3153

Answers (1)

tix
tix

Reputation: 2158

I was able to create a simple Cloud Function that triggers Dataproc Job on GCS create file event. In this example, the file in GCS contains a Pig query to execute. However you can follow Dataproc API documentation to create a PySpark version.

index.js:

exports.submitJob = (event, callback) => {

  const google = require('googleapis');

  const projectId = 'my-project'
  const clusterName = 'my-cluster'

  const file = event.data;
  if (file.name) {
    google.auth.getApplicationDefault(function (err, authClient, projectId) {
      if (err) {
        throw err;
      }

      const queryFileUri = "gs://" + file.bucket + "/" + file.name
      console.log("Using queryFileUri: ", queryFileUri);

      if (authClient.createScopedRequired && authClient.createScopedRequired()) {
        authClient = authClient.createScoped([
          'https://www.googleapis.com/auth/cloud-platform',
          'https://www.googleapis.com/auth/userinfo.email'
        ]);
      }

      const dataproc = google.dataproc({ version: 'v1beta2', auth: authClient });

      dataproc.projects.regions.jobs.submit({
          projectId: projectId,
          region: "global",
          resource: {
            "job": {
              "placement": {"clusterName": clusterName},
              "pigJob": {
                "queryFileUri": queryFileUri,
              }
            }
          }
        }, function(err, response) {
          if (err) {
            console.error("Error submitting job: ", err);
          }
          console.log("Dataproc response: ", response);
          callback();
        });

    });
  } else {
    throw "Skipped processing file!";
  }

  callback();
};

Make sure to set Function to execute to submitJob.

package.json:

{
  "name": "sample-cloud-storage",
  "version": "0.0.1",
  "dependencies":{ "googleapis": "^21.3.0" }
}

The following blogpost gave me many ideas how to get started: https://cloud.google.com/blog/big-data/2016/04/scheduling-dataflow-pipelines-using-app-engine-cron-service-or-cloud-functions

Upvotes: 2

Related Questions