mang4521
mang4521

Reputation: 802

Submit command line arguments to a pyspark job on airflow

I have a pyspark job available on GCP Dataproc to be triggered on airflow as shown below:

config = help.loadJSON("batch/config_file")

MY_PYSPARK_JOB = {
    "reference": {"project_id": "my_project_id"},
    "placement": {"cluster_name": "my_cluster_name"},
    "pyspark_job": {
        "main_python_file_uri": "gs://file/loc/my_spark_file.py"]
        "properties": config["spark_properties"]
        "args": <TO_BE_ADDED>
    },
}

I need to supply command line arguments to this pyspark job as show below [this is how I am running my pyspark job from command line]:

spark-submit gs://file/loc/my_spark_file.py --arg1 val1 --arg2 val2

I am providing the arguments to my pyspark job using "configparser". Therefore, arg1 is the key and val1 is the value from my spark-submit commant above.

How do I define the "args" param in the "MY_PYSPARK_JOB" defined above [equivalent to my command line arguments]?

Upvotes: 5

Views: 1945

Answers (2)

mang4521
mang4521

Reputation: 802

I finally managed to solve this conundrum. If we are making use of ConfigParser, the key has to be specified as below [irrespective of whether the argument is being passed as command or on airflow]:

--arg1

In airflow, the configs are passed as a Sequence[str] (as mentioned by @Betjens below) and each argument is defined as follows:

arg1=val1

Therefore, as per my requirement, command line arguments are defined as depicted below:

"args": ["--arg1=val1",
    "--arg2=val2"]

PS: Thank you @Betjens for all your suggestions.

Upvotes: 4

Betjens
Betjens

Reputation: 1391

You have to pass a Sequence[str]. If you check DataprocSubmitJobOperator you will see that the params job implements a class google.cloud.dataproc_v1.types.Job.

class DataprocSubmitJobOperator(BaseOperator):
...
    :param job: Required. The job resource. If a dict is provided, it must be of the same form as the protobuf message.
    :class:`~google.cloud.dataproc_v1.types.Job` 

So, on the section about job type pySpark which is google.cloud.dataproc_v1.types.PySparkJob:

args Sequence[str] Optional. The arguments to pass to the driver. Do not include arguments, such as --conf, that can be set as job properties, since a collision may occur that causes an incorrect job submission.

Upvotes: 1

Related Questions