Reputation: 1091
I am trying to write a Spark DF (batch DF) to Kafka and i need to write the data to specific partitions.
I tried the following code
myDF.write
.format("kafka")
.option("kafka.bootstrap.servers", kafkaProps.getBootstrapServers)
.option("kafka.security.protocol", "SSL")
.option("kafka.truststore.location", kafkaProps.getTrustStoreLocation)
.option("kafka.truststore.password", kafkaProps.getTrustStorePassword)
.option("kafka.keystore.location", kafkaProps.getKeyStoreLocation)
.option("kafka.keystore.password", kafkaProps.getKeyStorePassword)
.option("kafka.partitioner.class", "util.MyCustomPartitioner")
.option("topic",kafkaProps.getTopicName)
.save()
And the Schema of the DF i am writing is
+---+---------+-----+
|key|partition|value|
+---+---------+-----+
+---+---------+-----+
I had to repartition (to 1 partition) the "myDF" since i need to order the data based on date column.
It is writing the data to a Single partition but not the one that is in the DF's "partition" column or the one returned by the Custom Partitioner (which is same as the value in the partition column).
Thanks Sateesh
Upvotes: 1
Views: 723
Reputation: 18475
The feature to use the column "partition" in your Dataframe is only available with version 3.x and not earlier according to the 2.4.7 docs
However, using the option kafka.partitioner.class
will still work. Spark Structured Streaming allows you to use plain KafkaConsumer configuration when using the prefix kafka.
, so this will also work on version 2.4.4.
Below code runs fine with Spark 3.0.1 and Confluent community edition 5.5.0. On Spark 2.4.4, the "partition" column does not have any impact, but my custom partitioner class applies.
case class KafkaRecord(partition: Int, value: String)
val spark = SparkSession.builder()
.appName("test")
.master("local[*]")
.getOrCreate()
// create DataFrame
import spark.implicits._
val df = Seq((0, "Alice"), (1, "Bob")).toDF("partition", "value").as[KafkaRecord]
df.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "test")
.save()
What you then see in the console-consumer:
# partition 0
$ kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic test --partition 0
Alice
and
# partition 1
$ kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic test --partition 1
Bob
Also getting the same results when using a custom Partitioner
.option("kafka.partitioner.class", "org.test.CustomPartitioner")
where my custom Partitioner is defined as
package org.test
class CustomPartitioner extends Partitioner {
override def partition(topic: String, key: Any, keyBytes: Array[Byte], value: Any,valueBytes: Array[Byte],cluster: Cluster): Int = {
if (!valueBytes.isEmpty && valueBytes.map(_.toChar).mkString == "Bob") {
0
} else {
1
}
}
}
Upvotes: 1