collarblind
collarblind

Reputation: 4739

How does number of partitions affect Spark Kafka Connections?

I am spinning up an EMR to publish a dataframe to kafka (about 300-400 rows). I am able to publish it and the dataframe has 200 partitions. While publishing the dataframe I see a huge spike in CPU in the kafka cluster for about 20-30 mins. Does the partition number create 200 connections?

Or does it utilize 1 connection as stated here. http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#producer-caching

Sample code

spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0
import org.apache.spark.sql.functions.col
val kafkaOptions = Map("kafka.bootstrap.servers" -> s"$host:$port",
        "kafka.security.protocol" -> "SSL",
        "kafka.ssl.endpoint.identification.algorithm" -> "",
        "kafka.ssl.truststore.location" -> "/home/hadoop/client.truststore.jks",
        "kafka.ssl.truststore.password" -> "password",
        "kafka.ssl.keystore.type" -> "PKCS12",
        "kafka.ssl.key.password" -> "password",
        "kafka.ssl.keystore.location" -> "/home/hadoop/client.keystore.p12",
        "kafka.ssl.keystore.password" -> "password")
    )

 val df = spark
        .read
        .option("header", true)
        .option("escape", "\"")
        .csv("s3://bucket/file.csv")

 val publishToKafkaDf = df.withColumn("value", col("body"))

 publishToKafkaDf
      .selectExpr( "CAST(value AS STRING)")
      .write
      .format("kafka")
      .option("topic", "test-topic")
      .options(kafkaOptions)
      .save()

Upvotes: 2

Views: 694

Answers (1)

Jacek Laskowski
Jacek Laskowski

Reputation: 74619

I am able to publish it and the dataframe has 200 partitions. While publishing the dataframe I see a huge spike in CPU in the kafka cluster for about 20-30 mins. Does the partition number create 200 connections?

As per Producer Caching mentioned:

Spark initializes a Kafka producer instance and co-use across tasks for same caching key.

That tells me that there will be one Kafka producer shared across all tasks on a single executor. (I haven't checked out the sources though so I'm not so sure.)

In other words, the number of partitions (that are tasks at execution time) is shared across available executors. If you've got 10 executors, my understanding is that there will be 10 Kafka producers.


Please note the document is for the latest Spark 3.0.0 while you use Spark 2.3.0 based on:

--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0

I think it does not matter much as this one-Kafka-producer-per-executor has been used in earlier versions. They may've improved the sharing and caching though in 3.0.

Upvotes: 2

Related Questions