Reputation: 21
I've been using pyspark for Spark Streaming (Spark 2.0.2) with Kafka (0.10.1.0) successfully before, but my purposes are better suited for Structured Streaming. I've attempted to use the example online: https://spark.apache.org/docs/2.1.0/structured-streaming-kafka-integration.html
with the following analogous code:
ds1 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
query = ds1
.writeStream
.outputMode('append')
.format('console')
.start()
query.awaitTermination()
However, I always end up with the following error:
: org.apache.kafka.common.config.ConfigException:
Missing required configuration "partition.assignment.strategy" which has no default value
I also tried adding in this to my set of options when creating ds1:
.option("partition.assignment.strategy", "range")
But even explicitly assigning it a value didn't stop the error, nor did any other value (like "roundrobin") that I could find online or in the Kafka documentation.
I also tried this with the "assign" option and achieved the same error (our Kafka host is set up for assign--each consumer is assigned only one partition, and we don't have any rebalancing).
Any idea what's going on here? The documentation isn't helpful (probably since it's still in experimental phase). Also, is there anyway to do Structured Streaming using KafkaUtils? Or is this the only gateway?
Upvotes: 2
Views: 3306
Reputation: 11
Add kafka-clients-*.jar
to your spark jar folder, then restart the spark master and worker. then you don't need to add .option("partition.assignment.strategy", "range")
Upvotes: 1
Reputation: 81
I was having this issue when using Structured Streaming in Spark 2.3.2. Like @bruce.liu hinted in his answer, it happens when Spark's JVM does not have kafka-clients....jar file in its classpath.
I fixed it by download the kafka-clients jar (https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients/0.10.0.1) and then provided it to spark-submit using --jars
and --driver-class-path
options.
Something like this:
spark-submit --class MainClass --master local[*] --jars local:///root/sources/jars/kafka-clients-0.10.0.1.jar --driver-class-path local:///root/sources/jars/kafka-clients-0.10.0.1.jar app.jar
Upvotes: 1
Reputation: 20816
There is a known issue in Kafka 0.10.1.* client, and you should not use it with Spark because it may generate wrong answers due to https://issues.apache.org/jira/browse/KAFKA-4547 . You can use 0.10.0.1 client, and it should work with 0.10.1.* Kafka cluster.
To send a Kafka configuration to Kafka consumer client in Structured Streaming, you need to add the kafka.
prefix, such as .option("kafka.partition.assignment.strategy", "range")
. However, you don't need to set kafka.partition.assignment.strategy
because it has a default value. My hunch is you probably put both Kafka 0.8.* and 0.10.* jars on classpath and load wrong classes.
Which API in KafkaUtils you want to use but is missing in Structured Streaming? Spark 2.2.0 is just out, you can use both batch or streaming queries with Kafka in Structured Streaming. Read http://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html for examples.
Upvotes: 5