Reputation: 2761
I just need to run a dataflow pipeline on a daily basis, but it seems to me that suggested solutions like App Engine Cron Service, which requires building a whole web app, seems a bit too much. I was thinking about just running the pipeline from a cron job in a Compute Engine Linux VM, but maybe that's far too simple :). What's the problem with doing it that way, why isn't anybody (besides me I guess) suggesting it?
Upvotes: 18
Views: 15786
Reputation: 46
Original answer:
The following terraform code works for me in the case of scheduling a dataflow job from a flex template using :
data "google_project" "project" {}
resource "google_cloud_scheduler_job" "scheduler" {
name = "scheduler-demo"
schedule = "0 0 * * *"
# This needs to be us-central1 even if the app engine is in us-central.
# You will get a resource not found error if just using us-central.
region = "us-central1"
http_target {
http_method = "POST"
uri = "https://dataflow.googleapis.com/v1b3/projects/${var.project_id}/locations/${var.region}/flexTemplates:launch"
oauth_token {
service_account_email = google_service_account.cloud-scheduler-demo.email
}
# need to encode the string
body = base64encode(<<-EOT
{
"launchParameter": {
"jobName": "test-cloud-scheduler",
"containerSpecGcsPath": "gs://BUCKET_PATH/FILE",
"parameters": {
"region": "${var.region}",
"autoscalingAlgorithm": "THROUGHPUT_BASED",
},
"environment": {
"maxWorkers": "10",
"tempLocation": "gs://BUCKET_PATH/temp"
}
}
}
EOT
)
}
}
Edit 25/04/24 : The new advised way seems to be with dataflow data pipelines https://cloud.google.com/dataflow/docs/guides/data-pipelines#overview. It works both with classic and flex templates: https://registry.terraform.io/providers/hashicorp/google/5.26.0/docs/resources/data_pipeline_pipeline
example of terraform code for flex templates :
resource "google_data_pipeline_pipeline" "primary" {
name = "my-pipeline"
display_name = "my-pipeline"
type = "PIPELINE_TYPE_BATCH"
state = "STATE_ACTIVE"
region = "us-central1"
workload {
dataflow_flex_template_request {
project_id = var.project
launch_parameter {
container_spec_gcs_path = "gs://BUCKET_PATH/FILE"
job_name = "my-pipeline"
parameters = {
"region": "us-central1",
"autoscalingAlgorithm": "THROUGHPUT_BASED",
}
environment {
max_workers = 10
temp_location = "gs://BUCKET_PATH/FILE"
}
update = false
}
location = "us-central1"
}
}
schedule_info {
schedule = "0 0 * * *"
}
}
Upvotes: 0
Reputation: 21
You can use cloud scheduler to schedule your job as well. See my post
https://medium.com/@zhongchen/schedule-your-dataflow-batch-jobs-with-cloud-scheduler-8390e0e958eb
Terraform script
data "google_project" "project" {}
resource "google_cloud_scheduler_job" "scheduler" {
name = "scheduler-demo"
schedule = "0 0 * * *"
# This needs to be us-central1 even if the app engine is in us-central.
# You will get a resource not found error if just using us-central.
region = "us-central1"
http_target {
http_method = "POST"
uri = "https://dataflow.googleapis.com/v1b3/projects/${var.project_id}/locations/${var.region}/templates:launch?gcsPath=gs://zhong-gcp/templates/dataflow-demo-template"
oauth_token {
service_account_email = google_service_account.cloud-scheduler-demo.email
}
# need to encode the string
body = base64encode(<<-EOT
{
"jobName": "test-cloud-scheduler",
"parameters": {
"region": "${var.region}",
"autoscalingAlgorithm": "THROUGHPUT_BASED",
},
"environment": {
"maxWorkers": "10",
"tempLocation": "gs://zhong-gcp/temp",
"zone": "us-west1-a"
}
}
EOT
)
}
}
Upvotes: 2
Reputation: 321
This is how I did it using Cloud Functions, PubSub, and Cloud Scheduler (this assumes you've already created a Dataflow template and it exists in your GCS bucket somewhere)
Create a new topic in PubSub. this will be used to trigger the Cloud Function
Create a Cloud Function that launches a Dataflow job from a template. I find it easiest to just create this from the CF Console. Make sure the service account you choose has permission to create a dataflow job. the function's index.js looks something like:
const google = require('googleapis');
exports.triggerTemplate = (event, context) => {
// in this case the PubSub message payload and attributes are not used
// but can be used to pass parameters needed by the Dataflow template
const pubsubMessage = event.data;
console.log(Buffer.from(pubsubMessage, 'base64').toString());
console.log(event.attributes);
google.google.auth.getApplicationDefault(function (err, authClient, projectId) {
if (err) {
console.error('Error occurred: ' + err.toString());
throw new Error(err);
}
const dataflow = google.google.dataflow({ version: 'v1b3', auth: authClient });
dataflow.projects.templates.create({
projectId: projectId,
resource: {
parameters: {},
jobName: 'SOME-DATAFLOW-JOB-NAME',
gcsPath: 'gs://PATH-TO-YOUR-TEMPLATE'
}
}, function(err, response) {
if (err) {
console.error("Problem running dataflow template, error was: ", err);
}
console.log("Dataflow template response: ", response);
});
});
};
The package.json looks like
{
"name": "pubsub-trigger-template",
"version": "0.0.1",
"dependencies": {
"googleapis": "37.1.0",
"@google-cloud/pubsub": "^0.18.0"
}
}
Go to PubSub and the topic you created, manually publish a message. this should trigger the Cloud Function and start a Dataflow job
Use Cloud Scheduler to publish a PubSub message on schedule https://cloud.google.com/scheduler/docs/tut-pub-sub
Upvotes: 10
Reputation: 5395
There is a FAQ answer to that question: https://cloud.google.com/dataflow/docs/resources/faq#is_there_a_built-in_scheduling_mechanism_to_execute_pipelines_at_given_time_or_interval
- You can automate pipeline execution by using Google App Engine (Flexible Environment only) or Cloud Functions.
- You can use Apache Airflow's Dataflow Operator, one of several Google Cloud Platform Operators in a Cloud Composer workflow.
- You can use custom (cron) job processes on Compute Engine.
The Cloud Function approach is described as "Alpha" and it's still true that they don't have scheduling (no equivalent to AWS cloudwatch scheduling event), only Pub/Sub messages, Cloud Storage changes, HTTP invocations.
Cloud composer looks like a good option. Effectively a re-badged Apache Airflow, which is itself a great orchestration tool. Definitely not "too simple" like cron :)
Upvotes: 7
Reputation: 14781
There's absolutely nothing wrong with using a cron job to kick off your Dataflow pipelines. We do it all the time for our production systems, whether it be our Java or Python developed pipelines.
That said however, we are trying to wean ourselves off cron jobs, and move more toward using either AWS Lambdas (we run multi cloud) or Cloud Functions. Unfortunately, Cloud Functions don't have scheduling yet. AWS Lambdas do.
Upvotes: 7