CCC
CCC

Reputation: 2761

easiest way to schedule a Google Cloud Dataflow job

I just need to run a dataflow pipeline on a daily basis, but it seems to me that suggested solutions like App Engine Cron Service, which requires building a whole web app, seems a bit too much. I was thinking about just running the pipeline from a cron job in a Compute Engine Linux VM, but maybe that's far too simple :). What's the problem with doing it that way, why isn't anybody (besides me I guess) suggesting it?

Upvotes: 18

Views: 15786

Answers (5)

Kevin Zhou
Kevin Zhou

Reputation: 46

Original answer:

The following terraform code works for me in the case of scheduling a dataflow job from a flex template using :

data "google_project" "project" {}
resource "google_cloud_scheduler_job" "scheduler" {
  name = "scheduler-demo"
  schedule = "0 0 * * *"
  # This needs to be us-central1 even if the app engine is in us-central.
  # You will get a resource not found error if just using us-central.
  region = "us-central1"

  http_target {
    http_method = "POST"
    uri = "https://dataflow.googleapis.com/v1b3/projects/${var.project_id}/locations/${var.region}/flexTemplates:launch"
    oauth_token {
      service_account_email = google_service_account.cloud-scheduler-demo.email
    }

    # need to encode the string
    body = base64encode(<<-EOT
    {
      "launchParameter": {
        "jobName": "test-cloud-scheduler",
        "containerSpecGcsPath": "gs://BUCKET_PATH/FILE",
        "parameters": {
          "region": "${var.region}",
          "autoscalingAlgorithm": "THROUGHPUT_BASED",
        },
        "environment": {
          "maxWorkers": "10",
          "tempLocation": "gs://BUCKET_PATH/temp"
        }
      }
    }
EOT
    )
  }
}

Edit 25/04/24 : The new advised way seems to be with dataflow data pipelines https://cloud.google.com/dataflow/docs/guides/data-pipelines#overview. It works both with classic and flex templates: https://registry.terraform.io/providers/hashicorp/google/5.26.0/docs/resources/data_pipeline_pipeline

example of terraform code for flex templates :

resource "google_data_pipeline_pipeline" "primary" {
  name         = "my-pipeline"
  display_name = "my-pipeline"
  type         = "PIPELINE_TYPE_BATCH"
  state        = "STATE_ACTIVE"
  region       = "us-central1"

  workload {
    dataflow_flex_template_request {
      project_id = var.project
      launch_parameter {
        container_spec_gcs_path   = "gs://BUCKET_PATH/FILE"
        job_name = "my-pipeline"
        parameters = {
         "region": "us-central1",
          "autoscalingAlgorithm": "THROUGHPUT_BASED",
        }
        environment {
          max_workers           = 10
          temp_location         = "gs://BUCKET_PATH/FILE"
        }
        update                 = false
      }
      location = "us-central1"
    }
  }
  schedule_info {
    schedule = "0 0 * * *"
  }
}

Upvotes: 0

Zhong Chen
Zhong Chen

Reputation: 21

You can use cloud scheduler to schedule your job as well. See my post

https://medium.com/@zhongchen/schedule-your-dataflow-batch-jobs-with-cloud-scheduler-8390e0e958eb

Terraform script

data "google_project" "project" {}
resource "google_cloud_scheduler_job" "scheduler" {
  name = "scheduler-demo"
  schedule = "0 0 * * *"
  # This needs to be us-central1 even if the app engine is in us-central.
  # You will get a resource not found error if just using us-central.
  region = "us-central1"

  http_target {
    http_method = "POST"
    uri = "https://dataflow.googleapis.com/v1b3/projects/${var.project_id}/locations/${var.region}/templates:launch?gcsPath=gs://zhong-gcp/templates/dataflow-demo-template"
    oauth_token {
      service_account_email = google_service_account.cloud-scheduler-demo.email
    }

    # need to encode the string
    body = base64encode(<<-EOT
    {
      "jobName": "test-cloud-scheduler",
      "parameters": {
        "region": "${var.region}",
        "autoscalingAlgorithm": "THROUGHPUT_BASED",
      },
      "environment": {
        "maxWorkers": "10",
        "tempLocation": "gs://zhong-gcp/temp",
        "zone": "us-west1-a"
      }
    }
EOT
    )
  }
}

Upvotes: 2

twang
twang

Reputation: 321

This is how I did it using Cloud Functions, PubSub, and Cloud Scheduler (this assumes you've already created a Dataflow template and it exists in your GCS bucket somewhere)

  1. Create a new topic in PubSub. this will be used to trigger the Cloud Function

  2. Create a Cloud Function that launches a Dataflow job from a template. I find it easiest to just create this from the CF Console. Make sure the service account you choose has permission to create a dataflow job. the function's index.js looks something like:

const google = require('googleapis');

exports.triggerTemplate = (event, context) => {
  // in this case the PubSub message payload and attributes are not used
  // but can be used to pass parameters needed by the Dataflow template
  const pubsubMessage = event.data;
  console.log(Buffer.from(pubsubMessage, 'base64').toString());
  console.log(event.attributes);

  google.google.auth.getApplicationDefault(function (err, authClient, projectId) {
  if (err) {
    console.error('Error occurred: ' + err.toString());
    throw new Error(err);
  }

  const dataflow = google.google.dataflow({ version: 'v1b3', auth: authClient });

  dataflow.projects.templates.create({
        projectId: projectId,
        resource: {
          parameters: {},
          jobName: 'SOME-DATAFLOW-JOB-NAME',
          gcsPath: 'gs://PATH-TO-YOUR-TEMPLATE'
        }
      }, function(err, response) {
        if (err) {
          console.error("Problem running dataflow template, error was: ", err);
        }
        console.log("Dataflow template response: ", response);
      });
  });
};

The package.json looks like

{
  "name": "pubsub-trigger-template",
  "version": "0.0.1",
  "dependencies": {
    "googleapis": "37.1.0",
    "@google-cloud/pubsub": "^0.18.0"
  }
}
  1. Go to PubSub and the topic you created, manually publish a message. this should trigger the Cloud Function and start a Dataflow job

  2. Use Cloud Scheduler to publish a PubSub message on schedule https://cloud.google.com/scheduler/docs/tut-pub-sub

Upvotes: 10

Davos
Davos

Reputation: 5395

There is a FAQ answer to that question: https://cloud.google.com/dataflow/docs/resources/faq#is_there_a_built-in_scheduling_mechanism_to_execute_pipelines_at_given_time_or_interval

  • You can automate pipeline execution by using Google App Engine (Flexible Environment only) or Cloud Functions.
  • You can use Apache Airflow's Dataflow Operator, one of several Google Cloud Platform Operators in a Cloud Composer workflow.
  • You can use custom (cron) job processes on Compute Engine.

The Cloud Function approach is described as "Alpha" and it's still true that they don't have scheduling (no equivalent to AWS cloudwatch scheduling event), only Pub/Sub messages, Cloud Storage changes, HTTP invocations.

Cloud composer looks like a good option. Effectively a re-badged Apache Airflow, which is itself a great orchestration tool. Definitely not "too simple" like cron :)

Upvotes: 7

Graham Polley
Graham Polley

Reputation: 14781

There's absolutely nothing wrong with using a cron job to kick off your Dataflow pipelines. We do it all the time for our production systems, whether it be our Java or Python developed pipelines.

That said however, we are trying to wean ourselves off cron jobs, and move more toward using either AWS Lambdas (we run multi cloud) or Cloud Functions. Unfortunately, Cloud Functions don't have scheduling yet. AWS Lambdas do.

Upvotes: 7

Related Questions