Sateesh K
Sateesh K

Reputation: 1091

Writing Spark DataFrame to Kafka is ignoring the partition column and kafka.partitioner.class

I am trying to write a Spark DF (batch DF) to Kafka and i need to write the data to specific partitions.

I tried the following code

myDF.write
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaProps.getBootstrapServers)
  .option("kafka.security.protocol", "SSL")
  .option("kafka.truststore.location", kafkaProps.getTrustStoreLocation)
  .option("kafka.truststore.password", kafkaProps.getTrustStorePassword)
  .option("kafka.keystore.location", kafkaProps.getKeyStoreLocation)
  .option("kafka.keystore.password", kafkaProps.getKeyStorePassword)
  .option("kafka.partitioner.class", "util.MyCustomPartitioner")
  .option("topic",kafkaProps.getTopicName)
  .save()

And the Schema of the DF i am writing is

+---+---------+-----+
|key|partition|value|
+---+---------+-----+
+---+---------+-----+

I had to repartition (to 1 partition) the "myDF" since i need to order the data based on date column.

It is writing the data to a Single partition but not the one that is in the DF's "partition" column or the one returned by the Custom Partitioner (which is same as the value in the partition column).

Thanks Sateesh

Upvotes: 1

Views: 723

Answers (1)

Michael Heil
Michael Heil

Reputation: 18475

The feature to use the column "partition" in your Dataframe is only available with version 3.x and not earlier according to the 2.4.7 docs

However, using the option kafka.partitioner.class will still work. Spark Structured Streaming allows you to use plain KafkaConsumer configuration when using the prefix kafka., so this will also work on version 2.4.4.

Below code runs fine with Spark 3.0.1 and Confluent community edition 5.5.0. On Spark 2.4.4, the "partition" column does not have any impact, but my custom partitioner class applies.

case class KafkaRecord(partition: Int, value: String)

val spark = SparkSession.builder()
  .appName("test")
  .master("local[*]")
  .getOrCreate()


// create DataFrame
import spark.implicits._
val df = Seq((0, "Alice"), (1, "Bob")).toDF("partition", "value").as[KafkaRecord]

df.write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "test")
  .save()

What you then see in the console-consumer:

# partition 0
$ kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic test --partition 0
Alice

and

# partition 1
$ kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic test --partition 1
Bob

Also getting the same results when using a custom Partitioner

.option("kafka.partitioner.class", "org.test.CustomPartitioner")

where my custom Partitioner is defined as

package org.test
class CustomPartitioner extends Partitioner {

  override def partition(topic: String, key: Any, keyBytes: Array[Byte], value: Any,valueBytes: Array[Byte],cluster: Cluster): Int = {
    if (!valueBytes.isEmpty && valueBytes.map(_.toChar).mkString == "Bob") {
      0
    } else {
      1
    }
  }
}

Upvotes: 1

Related Questions