hexacyanide
hexacyanide

Reputation: 91599

Cannot configure a GCP project when using DataProcPySparkOperator

I am using a Cloud Composer environment to run workflows in a GCP project. One of my workflows creates a Dataproc cluster in different project using the DataprocClusterCreateOperator, and then attempts to submit a PySpark job to that cluster using the DataProcPySparkOperator from the airflow.contrib.operators.dataproc_operator module.

To create the cluster, I can specify a project_id parameter to create it in another project, but it seems like DataProcPySparkOperator ignores this parameter. For example, I expect to be able to pass a project_id, but I end up with a 404 error when the task runs:

from airflow.contrib.operators.dataproc_operator import DataProcPySparkOperator

t1 = DataProcPySparkOperator(
  project_id='my-gcp-project',
  main='...',
  arguments=[...],
)

How can I use DataProcPySparkOperator to submit a job in another project?

Upvotes: 0

Views: 1262

Answers (1)

hexacyanide
hexacyanide

Reputation: 91599

The DataProcPySparkOperator from the airflow.contrib.operators.dataproc_operator module doesn't accept a project_id kwarg in its constructor, so it will always default to submitting Dataproc jobs in the project the Cloud Composer environment is in. If an argument is passed, then it is ignored, which results in a 404 error when running the task, because the operator will try to poll for a job using an incorrect cluster path.

One workaround is to copy the operator and hook, and modify it to accept a project ID. However, an easier solution is to use the newer operators from the airflow.providers packages if you are using a version of Airflow that supports them, because many airflow.contrib operators are deprecated in newer Airflow releases.

Below is an example. Note that there is a newer DataprocSubmitPySparkJobOperator in this module, but it is deprecated in favor of DataprocSubmitJobOperator. So, you should use the latter, which accepts a project ID.

from airflow.providers.google.cloud.operators.dataproc import DataprocSubmitJobOperator

t1 = DataprocSubmitJobOperator(
  project_id='my-gcp-project-id',
  location='us-central1',
  job={...},
)

If you are running an environment with Composer 1.10.5+, Airflow version 1.10.6+, and Python 3, the providers are preinstalled and can be used immediately.

Upvotes: 3

Related Questions