Reputation: 31
I currently am trying to spark-submit a fat jar to a local cluster, which I developed using Spark 2.4.6; Scala 2.11.12. Upon submitting to the cluster, I receive this error:
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2
My spark submit command (run in cmd prompt):
spark-submit --class main.app --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.6 my_app_name-1.0-SNAPSHOT-jar-with-dependencies.jar
Other details:
My pom file
[....headers and stuff]
<groupId>org.example</groupId>
<artifactId>my_app_name</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<scala.version>2.11.12</scala.version>
</properties>
<repositories>
<repository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</pluginRepository>
</pluginRepositories>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.junit.jupiter/junit-jupiter-api -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.6.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.specs</groupId>
<artifactId>specs</artifactId>
<version>1.2.5</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-avro -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_2.11</artifactId>
<version>2.4.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.4.3</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>2.4.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-tools -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-tools</artifactId>
<version>2.4.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.4.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.databricks/spark-csv -->
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-csv_2.11</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>2.7.4</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.3.3</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<!-- see http://davidb.github.com/scala-maven-plugin -->
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.3.2</version>
<configuration>
<recompileMode>incremental</recompileMode> <!-- NOTE: incremental compilation although faster requires passing to MAVEN_OPTS="-XX:MaxPermSize=128m" -->
<!-- addScalacArgs>-feature</addScalacArgs -->
<args>
<arg>-Yresolve-term-conflict:object</arg> <!-- required for package/object name conflict in Jenkins jar -->
</args>
<javacArgs>
<javacArg>-Xlint:unchecked</javacArg>
<javacArg>-Xlint:deprecation</javacArg>
</javacArgs>
</configuration>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<archive>
<manifest>
<mainClass>
ingest_package.object_ingest
</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
[....footers and stuff]
My Main App File
package main
import java.nio.file.{Files, Paths}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.avro.to_avro
import org.apache.spark.sql.functions.{date_format, struct}
object app {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local[*]")
.appName("parquet_ingest_engine")
.getOrCreate()
Logger.getLogger("org").setLevel(Level.ERROR)
val accessKeyId = System.getenv("AWS_ACCESS_KEY_ID")
val secretAccessKey = System.getenv("AWS_SECRET_ACCESS_KEY")
val person_df = spark.read.format("org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat").load("s3_parquet_path_here")
val person_df_reformatted = person_df.withColumn("registration_dttm_string", date_format(person_df("registration_dttm"), "MM/dd/yyyy hh:mm"))
val person_df_final = person_df_reformatted.select("registration_dttm_string", "id", "first_name", "last_name", "email", "gender", "ip_address", "cc", "country", "birthdate", "salary", "title", "comments")
person_df_final.printSchema()
person_df_final.show(5)
val person_avro_schema = new String(Files.readAllBytes(Paths.get("input\\person_schema.avsc")))
print(person_avro_schema)
person_df_final.write.format("avro").mode("overwrite").option("avroSchema", person_avro_schema).save("output/person.avro")
print("\n" + "=====================successfully wrote avro to local path=====================" + "\n")
person_df_final.select(to_avro(struct("registration_dttm_string", "id", "first_name", "last_name", "email", "gender", "ip_address", "cc", "country", "birthdate", "salary", "title", "comments")) as "value")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "spark_topic_test")
.save()
print("\n" + "========================Successfully wrote to avro consumer on localhost kafka consumer========================" + "\n"+ "\n")
}
}
Upvotes: 2
Views: 10948
Reputation: 87164
First, you have problems with dependencies:
com.databricks:spark-csv_2.11
- CSV support is in the Spark itself for a long timeorg.apache.spark:spark-sql-kafka-0-10_2.11:2.4.6
spark-sql
and spark-core
need to be declared with <scope>provided</scope>
like hereSecond, the problem could be from the incorrect Scala version (for example, you didn't do mvn clean
when you changed it) - if you said that code works with Spark 3.0 then it should be compiled with Scala 2.12, while 2.4.6 works only with 2.11
I strongly recommend to get rid of unnecessary dependencies, use provided, do mvn clean
, etc.
Upvotes: 1
Reputation: 111
I met the same error. And solved it by using the jar with same scala version and spark version. I see the jar version you are using (org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.6 ) is consistent with your spark, maybe you can try to change the version to a close one (e.g. org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 etc).
My spark is "version 2.4.4 using Scala version 2.11.12", when I read an avro file using the following jar(spark-avro_2.12), I got the exactly same error. spark-shell --packages org.apache.spark:spark-avro_2.12:3.1.2
It was fixed after changing to "spark-shell --packages com.databricks:spark-avro_2.11:2.4.0".
Upvotes: 1