prototype sql
prototype sql

Reputation: 49

How I pass parameter in Workflow Template Spark job

I have a problem with my spark dataproc workflow.

This works on launch:

gcloud dataproc jobs submit spark \
--project myproject \
--cluster=mycluster \
--region=europe-west3 \
--jars=gs:path\file.jar,gs://path//depende.jar \
--class=it.flow \
--properties spark.num.executors=2,spark.executor.cores=3,spark.executor.memory=5g,spark.driver.cores=2,spark.driver.memory=10g,spark.dynamicAllocation.enabled=false,spark.executor.userClassPathFirst=true,spark.driver.userClassPathFirst=true,spark.jars.packages=com.google.cloud:google-cloud-logging:2.2.0  
--  20210820 010000 000 0 000 TRY

I created a dataproc workflow and the python code to launch it through composer and it works.

Now i have to make the final parameters dynamic (-- 20210820 010000 000 0 000 TRY)

However, I am unable to pass parameters to the workflow:

gcloud dataproc workflow-templates create try1 --region=europe-west3
 
gcloud dataproc workflow-templates add-job spark \
--workflow-template=try1 \
--step-id=create_try1 \
--class=it.flow \
 --region=europe-west3 \
--jars=gs:path\file.jar,gs://path//depende.jar \
 --properties spark.num.executors=2,spark.executor.cores=3,spark.executor.memory=5g,spark.driver.cores=2,spark.driver.memory=10g,spark.dynamicAllocation.enabled=false,spark.executor.userClassPathFirst=true,spark.driver.userClassPathFirst=true,spark.jars.packages=com.google.cloud:google-cloud-logging:2.2.0 \
 -- $arg1 $arg2  
 
gcloud dataproc workflow-templates set-cluster-selector TRY1  --region=europe-west3 --cluster-labels=goog-dataproc-cluster-name=cluster

This call:

gcloud dataproc workflow-templates instantiate TRY1  --region=europe-west3 --parameters="arg1=20210820"

leads to the following error:

ERROR: (gcloud.dataproc.workflow-templates.instantiate) INVALID_ARGUMENT: Template does not contain a parameter with name arg1.

How can I resolve the issue?

yaml file

id: create_file
jobs:
- sparkJob:
    args:
    - ARG1
    - ARG2
    jarFileUris:
    - gs://mybucket/try_file.jar
    - gs://mybucket/try_dependencies_2.jar
    mainClass: org.apache.hadoop.examples.tryFile
    properties:
      spark.driver.cores: '2'
      spark.driver.memory: 10g
      spark.driver.userClassPathFirst: 'true'
      spark.dynamicAllocation.enabled: 'false'
      spark.executor.cores: '3'
      spark.executor.memory: 5g
      spark.executor.userClassPathFirst: 'true'
      spark.jars.packages: com.google.cloud:google-cloud-logging:2.2.0
      spark.num.executors: '2'
  stepId: create_file_try
  parameters:
- name: ARG1
  fields:
  - jobs['create_file_try'].sparkJob.args[0]
- name: ARG2
  fields:
  - jobs['create_file_try'].sparkJob.args[1]
name: projects/My-project-id/regions/europe-west3/workflowTemplates/create_file
updateTime: '2021-08-25T07:49:59.251096Z'

Upvotes: 3

Views: 1699

Answers (1)

Ricco D
Ricco D

Reputation: 7287

For your workflow template to accept parameters it is much better to use a yaml file. You can get the yaml file when you run your full command gcloud dataproc workflow-templates add-job spark. It will return a yaml configuration on the CLI.

In this example I just used sample code from the Dataproc documentation and used your values at --properties for the sake of testing.

NOTE: I used a dummy project-id in the yaml files for this example. Make sure you use your actual project-id so you won't encounter any problems.

Sample command:

gcloud dataproc workflow-templates add-job spark \
--workflow-template=try1 \
--step-id=create_try1 \
--class=org.apache.hadoop.examples.WordCount \
--region=europe-west3 \
--jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
--properties spark.num.executors=2,spark.executor.cores=3,spark.executor.memory=5g,spark.driver.cores=2,spark.driver.memory=10g,spark.dynamicAllocation.enabled=false,spark.executor.userClassPathFirst=true,spark.driver.userClassPathFirst=true,spark.jars.packages=com.google.cloud:google-cloud-logging:2.2.0 \
-- ARG1 ARG2  

CLI output (yaml config):

id: try1
jobs:
- sparkJob:
    args:
    - ARG1
    - ARG2
    jarFileUris:
    - file:///usr/lib/spark/examples/jars/spark-examples.jar
    mainClass: org.apache.hadoop.examples.WordCount
    properties:
      spark.driver.cores: '2'
      spark.driver.memory: 10g
      spark.driver.userClassPathFirst: 'true'
      spark.dynamicAllocation.enabled: 'false'
      spark.executor.cores: '3'
      spark.executor.memory: 5g
      spark.executor.userClassPathFirst: 'true'
      spark.jars.packages: com.google.cloud:google-cloud-logging:2.2.0
      spark.num.executors: '2'
  stepId: create_try1
name: projects/your-project-id/regions/europe-west3/workflowTemplates/try1
placement:
  managedCluster:
    clusterName: mycluster
updateTime: '2021-08-25T03:30:47.365244Z'
version: 3

Copy over the generated yaml config, open a text editor and add parameters: field. It will contain your arguments to accept.

parameters:
- name: ARG1
  fields:
  - jobs['create_try1'].sparkJob.args[0] # use the stepId in jobs[], in this example it is 'create_try1'
- name: ARG2
  fields:
  - jobs['create_try1'].sparkJob.args[1]

In this example I placed it after stepId:.

Edited yaml config:

id: try1
jobs:
- sparkJob:
    args:
    - ARG1
    - ARG2
    jarFileUris:
    - file:///usr/lib/spark/examples/jars/spark-examples.jar
    mainClass: org.apache.hadoop.examples.WordCount
    properties:
      spark.driver.cores: '2'
      spark.driver.memory: 10g
      spark.driver.userClassPathFirst: 'true'
      spark.dynamicAllocation.enabled: 'false'
      spark.executor.cores: '3'
      spark.executor.memory: 5g
      spark.executor.userClassPathFirst: 'true'
      spark.jars.packages: com.google.cloud:google-cloud-logging:2.2.0
      spark.num.executors: '2'
  stepId: create_try1
parameters:
- name: ARG1
  fields:
  - jobs['create_try1'].sparkJob.args[0]
- name: ARG2
  fields:
  - jobs['create_try1'].sparkJob.args[1]
name: projects/your-project-id/regions/europe-west3/workflowTemplates/try1
placement:
  managedCluster:
    clusterName: mycluster
updateTime: '2021-08-25T03:13:25.014685Z'
version: 3

Use the edited yaml file to overwrite your workflow template:

gcloud dataproc workflow-templates import try1 \
    --region=europe-west3 \
    --source=config.yaml

Run template using gcloud dataproc workflow-templates instantiate:

enter image description here

For more details you can refer to Parameterization of Workflow Templates.

Upvotes: 5

Related Questions