Frank Pinto
Frank Pinto

Reputation: 163

Airflow Dataproc serverless job creator doesnt take python parameters

I'm trying to setup a Dataproc Serverless Batch Job from google cloud composer using the DataprocCreateBatchOperator operator that takes some arguments that would impact the underlying python code. However I'm running into the following error:

error: unrecognized arguments: --run_timestamp "2022-06-17T13:22:51.800834+00:00" --temp_bucket "gs://pipeline/spark_temp_bucket/hourly/" --bucket "pipeline" --pipeline "hourly"

This is how my operator is setup:

create_batch = DataprocCreateBatchOperator(
        task_id="hourly_pipeline",
        project_id="dev",
        region="us-west1",
        batch_id="".join(random.choice(string.ascii_lowercase + string.digits + "-") for i in range(40)),
        batch={
            "environment_config": {
                "execution_config": {
                    "service_account": "<service_account>",
                    "subnetwork_uri": "<uri>
                }
            },
            "pyspark_batch": {
                "main_python_file_uri": "gs://pipeline/code/pipeline_feat_creation.py",
                "args": [
                    '--run_timestamp "{{ ts }}"',
                    '--temp_bucket "gs://pipeline/spark_temp_bucket/hourly/"',
                    '--bucket "pipeline"',
                    '--pipeline "hourly"'
                ],
                "jar_file_uris": [
                    "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.25.0.jar"
                ],
            }
        }
    )

Regarding the args array: I tried setting the parameters with and without encapsulating them with "". I've also already did a gcloud submit that worked like so:

gcloud dataproc batches submit pyspark "gs://pipeline/code/pipeline_feat_creation.py" \
--batch=jskdnkajsnd-test-10 --region=us-west1 --subnet="<uri>" \
-- --run_timestamp "2020-01-01" --temp_bucket gs://pipeline/spark_temp_bucket/hourly/ --bucket pipeline --pipeline hourly

Upvotes: 1

Views: 1237

Answers (1)

Frank Pinto
Frank Pinto

Reputation: 163

The error I was running into was that I wasn't adding a = after each parameter; I've also eliminated the " encapsulation around each parameter. This is how the args are now setup:

"args": [
    '--run_timestamp={{ ts }}',
    '--temp_bucket=gs://pipeline/spark_temp_bucket/hourly/',
    '--bucket=pipeline',
    '--pipeline=hourly'
]

Upvotes: 2

Related Questions