Reputation: 802
I have a pyspark job available on GCP Dataproc to be triggered on airflow as shown below:
config = help.loadJSON("batch/config_file")
MY_PYSPARK_JOB = {
"reference": {"project_id": "my_project_id"},
"placement": {"cluster_name": "my_cluster_name"},
"pyspark_job": {
"main_python_file_uri": "gs://file/loc/my_spark_file.py"]
"properties": config["spark_properties"]
"args": <TO_BE_ADDED>
},
}
I need to supply command line arguments to this pyspark job as show below [this is how I am running my pyspark job from command line]:
spark-submit gs://file/loc/my_spark_file.py --arg1 val1 --arg2 val2
I am providing the arguments to my pyspark job using "configparser". Therefore, arg1 is the key and val1 is the value from my spark-submit commant above.
How do I define the "args" param in the "MY_PYSPARK_JOB" defined above [equivalent to my command line arguments]?
Upvotes: 5
Views: 1945
Reputation: 802
I finally managed to solve this conundrum. If we are making use of ConfigParser, the key has to be specified as below [irrespective of whether the argument is being passed as command or on airflow]:
--arg1
In airflow, the configs are passed as a Sequence[str] (as mentioned by @Betjens below) and each argument is defined as follows:
arg1=val1
Therefore, as per my requirement, command line arguments are defined as depicted below:
"args": ["--arg1=val1",
"--arg2=val2"]
PS: Thank you @Betjens for all your suggestions.
Upvotes: 4
Reputation: 1391
You have to pass a Sequence[str]
. If you check DataprocSubmitJobOperator you will see that the params job
implements a class google.cloud.dataproc_v1.types.Job.
class DataprocSubmitJobOperator(BaseOperator):
...
:param job: Required. The job resource. If a dict is provided, it must be of the same form as the protobuf message.
:class:`~google.cloud.dataproc_v1.types.Job`
So, on the section about job type pySpark
which is google.cloud.dataproc_v1.types.PySparkJob:
args Sequence[str] Optional. The arguments to pass to the driver. Do not include arguments, such as
--conf
, that can be set as job properties, since a collision may occur that causes an incorrect job submission.
Upvotes: 1