Reputation: 91599
I am using a Cloud Composer environment to run workflows in a GCP project. One of my workflows creates a Dataproc cluster in different project using the DataprocClusterCreateOperator
, and then attempts to submit a PySpark job to that cluster using the DataProcPySparkOperator
from the airflow.contrib.operators.dataproc_operator
module.
To create the cluster, I can specify a project_id
parameter to create it in another project, but it seems like DataProcPySparkOperator
ignores this parameter. For example, I expect to be able to pass a project_id
, but I end up with a 404
error when the task runs:
from airflow.contrib.operators.dataproc_operator import DataProcPySparkOperator
t1 = DataProcPySparkOperator(
project_id='my-gcp-project',
main='...',
arguments=[...],
)
How can I use DataProcPySparkOperator
to submit a job in another project?
Upvotes: 0
Views: 1262
Reputation: 91599
The DataProcPySparkOperator
from the airflow.contrib.operators.dataproc_operator
module doesn't accept a project_id
kwarg in its constructor, so it will always default to submitting Dataproc jobs in the project the Cloud Composer environment is in. If an argument is passed, then it is ignored, which results in a 404 error when running the task, because the operator will try to poll for a job using an incorrect cluster path.
One workaround is to copy the operator and hook, and modify it to accept a project ID. However, an easier solution is to use the newer operators from the airflow.providers
packages if you are using a version of Airflow that supports them, because many airflow.contrib
operators are deprecated in newer Airflow releases.
Below is an example. Note that there is a newer DataprocSubmitPySparkJobOperator
in this module, but it is deprecated in favor of DataprocSubmitJobOperator
. So, you should use the latter, which accepts a project ID.
from airflow.providers.google.cloud.operators.dataproc import DataprocSubmitJobOperator
t1 = DataprocSubmitJobOperator(
project_id='my-gcp-project-id',
location='us-central1',
job={...},
)
If you are running an environment with Composer 1.10.5+, Airflow version 1.10.6+, and Python 3, the providers are preinstalled and can be used immediately.
Upvotes: 3