kwn
kwn

Reputation: 919

Pass pyfiles and arguments to DataProcPySparkOperator

I am trying to pass arguments and zipped pyfiles to a temporary Dataproc Cluster in Composer

spark_args = {
    'conn_id': 'spark_default',
    'num_executors': 2,
    'executor_cores': 2,
    'executor_memory': '2G',
    'driver_memory': '2G',
}    

task = dataproc_operator.DataProcPySparkOperator(
                task_id='spark_preprocess_{}'.format(name),
                project_id=PROJECT_ID,
                cluster_name=CLUSTER_NAME,
                region='europe-west4',
                main='gs://my-bucket/dist/main.py',
                pyfiles='gs://my-bucket/dist/jobs.zip',
                dataproc_pyspark_properties=spark_args,
                arguments=['--name', 'test', '--date', self.date_exec],
                dag=subdag
            )

But I get the following error, any idea how to correctly format the arguments?

Invalid value at 'job.pyspark_job.properties[1].value' (TYPE_STRING)

Upvotes: 2

Views: 3049

Answers (1)

Igor Dvorzhak
Igor Dvorzhak

Reputation: 4457

As pointed out in the comment, the issues is that spark_args has non-string values, but it should contain only strings per error message:

Invalid value at 'job.pyspark_job.properties[1].value' (TYPE_STRING)

Upvotes: 1

Related Questions