Reputation: 2210
I'm playing around with Gcloud Composer, trying to create a DAG that creates a DataProc cluster, runs a simple Spark job, then tears down the cluster. I am trying to run the Spark PI example job.
I understand that when calling DataProcSparkOperator I can choose only to define either the main_jar
or the main_class
property. When I define main_class
, the job fails with the error:
java.lang.ClassNotFoundException: org.apache.spark.examples.SparkPi
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:239)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:851)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
When I choose to define the main_jar
property, the job fails with the error:
Error: No main class set in JAR; please specify one with --class
Run with --help for usage help or --verbose for debug output
I'm at a bit of a loss as to how to resolve this, as I am kinda new to both Spark and DataProc.
My DAG:
import datetime as dt
from airflow import DAG, models
from airflow.contrib.operators import dataproc_operator as dpo
from airflow.utils import trigger_rule
MAIN_JAR = 'file:///usr/lib/spark/examples/jars/spark-examples.jar'
MAIN_CLASS = 'org.apache.spark.examples.SparkPi'
CLUSTER_NAME = 'quickspark-cluster-{{ ds_nodash }}'
yesterday = dt.datetime.combine(
dt.datetime.today() - dt.timedelta(1),
dt.datetime.min.time())
default_dag_args = {
'start_date': yesterday,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': dt.timedelta(seconds=30),
'project_id': models.Variable.get('gcp_project')
}
with DAG('dataproc_spark_submit', schedule_interval='0 17 * * *',
default_args=default_dag_args) as dag:
create_dataproc_cluster = dpo.DataprocClusterCreateOperator(
project_id = default_dag_args['project_id'],
task_id = 'create_dataproc_cluster',
cluster_name = CLUSTER_NAME,
num_workers = 2,
zone = models.Variable.get('gce_zone')
)
run_spark_job = dpo.DataProcSparkOperator(
task_id = 'run_spark_job',
#main_jar = MAIN_JAR,
main_class = MAIN_CLASS,
cluster_name = CLUSTER_NAME
)
delete_dataproc_cluster = dpo.DataprocClusterDeleteOperator(
project_id = default_dag_args['project_id'],
task_id = 'delete_dataproc_cluster',
cluster_name = CLUSTER_NAME,
trigger_rule = trigger_rule.TriggerRule.ALL_DONE
)
create_dataproc_cluster >> run_spark_job >> delete_dataproc_cluster
Upvotes: 1
Views: 1583
Reputation: 7058
I compared it with a successful job using the CLI and saw that, even when the class was populating the Main class or jar
field, the path to the Jar was specified in Jar files
:
Checking the operator I noticed there is also a dataproc_spark_jars
parameter which is not mutually exclusive to main_class
:
run_spark_job = dpo.DataProcSparkOperator(
task_id = 'run_spark_job',
dataproc_spark_jars = [MAIN_JAR],
main_class = MAIN_CLASS,
cluster_name = CLUSTER_NAME
)
Adding it did the trick:
Upvotes: 4