Cam
Cam

Reputation: 2208

HttpError 400 when trying to run DataProcSparkOperator task from a local Airflow

I'm testing out a DAG that I used to have running on Google Composer without error, on a local install of Airflow. The DAG spins up a Google Dataproc cluster, runs a Spark job (JAR file located on a GS bucket), then spins down the cluster.

The DataProcSparkOperator task fails immediately each time with the following error:

googleapiclient.errors.HttpError: <HttpError 400 when requesting https://dataproc.googleapis.com/v1beta2/projects//regions/global/jobs:submit?alt=json returned "Invalid resource field value in the request.">

It looks as though the URI is incorrect/incomplete, but I am not sure what is causing it. Below is the meat of my DAG. All the other tasks execute without error, and the only difference is the DAG is no longer running on Composer:

default_dag_args = {
    'start_date': yesterday,
    'email': models.Variable.get('email'),
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 0,
    'retry_delay': dt.timedelta(seconds=30),
    'project_id': models.Variable.get('gcp_project'),
    'cluster_name': 'susi-bsm-cluster-{{ ds_nodash }}'
}

def slack():
    '''Posts to Slack if the Spark job fails'''
    text = ':x: The DAG *{}* broke and I am not smart enough to fix it. Check the StackDriver and DataProc logs.'.format(DAG_NAME)
    s.post_slack(SLACK_URI, text)

with DAG(DAG_NAME, schedule_interval='@once',
    default_args=default_dag_args) as dag:
    # pylint: disable=no-value-for-parameter

    delete_existing_parquet = bo.BashOperator(
        task_id = 'delete_existing_parquet',
        bash_command = 'gsutil rm -r {}/susi/bsm/bsm.parquet'.format(GCS_BUCKET)
    )

    create_dataproc_cluster = dpo.DataprocClusterCreateOperator(
        task_id = 'create_dataproc_cluster',
        num_workers = num_workers_override or models.Variable.get('default_dataproc_workers'),
        zone = models.Variable.get('gce_zone'),
        init_actions_uris = ['gs://cjones-composer-test/susi/susi-bsm-dataproc-init.sh'],
        trigger_rule = trigger_rule.TriggerRule.ALL_DONE
    )

    run_spark_job = dpo.DataProcSparkOperator(
       task_id = 'run_spark_job',
       main_class = MAIN_CLASS,
       dataproc_spark_jars = [MAIN_JAR],
       arguments=['{}/susi.conf'.format(CONF_DEST), DATE_CONST]
    )

    notify_on_fail = po.PythonOperator(
        task_id = 'output_to_slack',
        python_callable = slack,
        trigger_rule = trigger_rule.TriggerRule.ONE_FAILED
    )

    delete_dataproc_cluster = dpo.DataprocClusterDeleteOperator(
       task_id = 'delete_dataproc_cluster',
       trigger_rule = trigger_rule.TriggerRule.ALL_DONE
    )

    delete_existing_parquet >> create_dataproc_cluster >> run_spark_job >> delete_dataproc_cluster >> notify_on_fail

Any assistance with this would be much appreciated!

Upvotes: 1

Views: 1245

Answers (1)

cyxxy
cyxxy

Reputation: 608

Unlike the DataprocClusterCreateOperator, the DataProcSparkOperator does not take the project_id as a parameter. It gets it from the Airflow connection (if you do not specify the gcp_conn_id parameter, it defaults to google_cloud_default). You have to configure your connection.

The reason you don't see this while running DAG in Composer is that Composer configures the google_cloud_default connection.

Upvotes: 3

Related Questions