Reputation: 182
I am running the Spark Structured Streaming along with Kafka. Below is the pom.xml
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<!-- Put the Scala version of the cluster -->
<scalaVersion>2.12.10</scalaVersion>
<sparkVersion>3.0.1</sparkVersion>
</properties>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>${sparkVersion}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${sparkVersion}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
<version>${sparkVersion}</version>
</dependency>
Building the fat jar with shade plugin. The jar is running as expected in my local setup with the command
spark-submit --master local[*] --class com.stream.Main --num-executors 3 --driver-memory 2g --executor-cores 2 --executor-memory 3g prism-event-synch-rta.jar
But when I am trying to run same jar in spark cluster using yarn with command:
spark-submit --master yarn --deploy-mode cluster --class com.stream.Main --num-executors 4 --driver-memory 2g --executor-cores 1 --executor-memory 4g gs://jars/prism-event-synch-rta.jar
Getting the this exception:
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:355)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245)
Caused by: org.apache.kafka.common.config.ConfigException: Missing required configuration "partition.assignment.strategy" which has no default value.
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)
I have tried setting up the "partition.assignment.strategy", then also its not working.
EDIT: Tried to send the kafka client using package option as well. Result is same exception.
spark-submit --packages org.apache.kafka:kafka-clients:2.1.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1 --master yarn --deploy-mode cluster --class com.stream.Main --num-executors 4 --driver-memory 2g --executor-cores 1 --executor-memory 4g gs://jars/prism-event-synch-rta.jar
Pls help.
Upvotes: 0
Views: 1482
Reputation: 182
I suspected the issue was with the kafka client jar. My first hunch was that the kafka client jar must be getting overridden in the fat jar that was created to run in cluster.
To make my theory concrete, I installed the same spark version (3.0.1) in my local and ran the job. It sailed beautifully.
I logged in to dataproc 3.0.1 cluster and did sudo find /usr -name "kafka-clients*", voila a I found the old version of the jar. /usr/lib/hadoop/lib/kafka-clients-0.8.2.1.jar I opened spark.env to verify, whether the path is being loaded at run time. To no surprise it was being loaded.
Now we know the root cause. To find the solution, need to override the jar provided. Did some research and found maven shade plugin to change the name of the class while packaging.
<relocations>
<relocation>
<pattern>org.apache.kafka</pattern>
<shadedPattern>shade.org.apache.kafka</shadedPattern>
</relocation>
</relocations>
Upvotes: 2