Sam McVeety
Sam McVeety

Reputation: 3214

Launching Cloud Dataflow from Cloud Functions

How do I launch a Cloud Dataflow job from a Google Cloud Function? I'd like to use Google Cloud Functions as a mechanism to enable cross-service composition.

Upvotes: 5

Views: 2893

Answers (2)

bigbounty
bigbounty

Reputation: 17368

The best way is to launch is via cloud function but be careful, if you are using the cloud function for google cloud storage, then for every file uploaded a dataflow job will be launched.

const { google } = require('googleapis');

const templatePath = "gs://template_dir/df_template;
const project = "<project_id>";
const tempLoc = "gs://tempLocation/";

exports.PMKafka = (data, context, callback) => {
    const file = data;

    console.log(`Event ${context.eventId}`);
    console.log(`Event Type: ${context.eventType}`);
    console.log(`Bucket Name: ${file.bucket}`);
    console.log(`File Name: ${file.name}`);
    console.log(`Metageneration: ${file.metageneration}`);
    console.log(`Created: ${file.timeCreated}`);
    console.log(`Updated: ${file.updated}`);
    console.log(`Uploaded File Name - gs://${file.bucket}/${file.name}`);

    google.auth.getApplicationDefault(function (err, authClient, projectId) {
        if (err) {
            throw err;
        }

        if (authClient.createScopedRequired && authClient.createScopedRequired()) {
            authClient = authClient.createScoped(authScope);
        }

        const dataflow = google.dataflow({ version: 'v1b3', auth: authClient });

        var inputDict= {
            inputFile: `gs://${file.bucket}/${file.name}`,
            ...
            ...
            <other_runtime_parameters>
        };

        var env = {
            tempLocation: tempLoc
        };

        var resource_opts = {
            parameters: inputDict,
            environment: env,
            jobName: config.jobNamePrefix + "-" + new Date().toISOString().toLowerCase().replace(":","-").replace(".","-")
        };

        var opts = {
            gcsPath: templatePath,
            projectId: project,
            resource: resource_opts
        }

        console.log(`Dataflow Run Time Options - ${JSON.stringify(opts)}`)

        dataflow.projects.templates.launch(opts, function (err, response) {
            if (err) {
                console.error("problem running dataflow template, error was: ", err);
                slack.publishMessage(null, null, false, err);
                return;
            }
            console.log("Dataflow template response: ", response);
            var jobid = response["data"]["job"]["id"];
            console.log("Dataflow Job ID: ", jobid);
        });
        callback();
    });
};

Upvotes: 1

Sam McVeety
Sam McVeety

Reputation: 3214

I've included a very basic example of the WordCount sample below. Please note that you'll need to include a copy of the java binary in your Cloud Function deployment, since it is not in the default environment. Likewise, you'll need to package your deploy jar with your Cloud Function as well.

module.exports = {
  wordcount: function (context, data) {
    const spawn = require('child_process').spawn;
    const child = spawn(
            'jre1.8.0_73/bin/java',
            ['-cp',
             'MY_JAR.jar',
             'com.google.cloud.dataflow.examples.WordCount',
             '--jobName=fromACloudFunction',
             '--project=MY_PROJECT',
             '--runner=BlockingDataflowPipelineRunner',
             '--stagingLocation=gs://STAGING_LOCATION',
             '--inputFile=gs://dataflow-samples/shakespeare/*',
             '--output=gs://OUTPUT_LOCATION'
            ],
            { cwd: __dirname });

    child.stdout.on('data', function(data) {
      console.log('stdout: ' + data);
    });
    child.stderr.on('data', function(data) {
      console.log('error: ' + data);
    });
    child.on('close', function(code) {
      console.log('closing code: ' + code);
    });
    context.success();
  }
}

You could further enhance this example by using the non-blocking runner and having the function return the Job ID, so that you can poll for job completion separately. This pattern should be valid for other SDKs as well, so long as their dependencies can be packaged into the Cloud Function.

Upvotes: 9

Related Questions