hampi2017
hampi2017

Reputation: 731

Kafka consumer group and partitions with Spark structured streaming

I have a Kafka topic with 3 partitions and I'm consuming that data using spark structured streaming. I have 3 consumers (lets say consumer group A) reading from single partition each, everything is working file till here.

I have a new requirement to read from the same topic and I want to parallelize it by creating 3 consumers (say consumer group B) again each reading from single partition. As I'm using structured streaming I can't mention group.id explicitly.

Will consumers from different group pointing to single/same partition read all the data ?

Upvotes: 2

Views: 2230

Answers (3)

Pardeep
Pardeep

Reputation: 1005

From Spark 3.0.1 documentation:

By default, each query generates a unique group id for reading data. This ensures that each Kafka source has its own consumer group that does not face interference from any other consumer, and therefore can read all of the partitions of its subscribed topics.

So, if you are using assign option and mentioning which partition to use it will read all data from a specific partition as by it's default nature it will be a different consumer group (group.id). assign option takes json string as a value and can have multiple partitions from different topics as well. For e.g., {"topicA":[0,1],"topicB":[2,4]}.

val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host:port")
  .option("assign", "{"topic-name":[0]}")
  .load()

Upvotes: 2

Michael Heil
Michael Heil

Reputation: 18525

Unless you are using Spark 3.x or higher, you will not be able to set the group.id in your Kafka input stream. Using Spark 3.x you could, as you have mentioned, have two different Structured Streaming jobs providing two different group.id to ensure that each job reads all message of the topic independent of the other job.

For Spark versions <= 2.4.x, Spark itself will create a unique Consumer Group for you as you can look up in the code on GitHub:

// Each running query should use its own group id. Otherwise, the query may be only 
// assigned partial data since Kafka will assign partitions to multiple consumers having
// the same group id. Hence, we should generate a unique id for each query.
val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"

So, also in that case, having two different Streaming Jobs will ensure that you have two different ConsumerGroup which allows both jobs to read all messages from the topic independent of the other job.

Upvotes: 4

BdEngineer
BdEngineer

Reputation: 3209

Use can use group.id as below for streaming

String processingGroup = "processingGroupA";

Dataset<Row> raw_df = sparkSession
                      .readStream()
                      .format("kafka")
                      .option("kafka.bootstrap.servers", consumerAppProperties.getProperty(BOOTSTRAP_SERVERS_CONFIG))
                      .option("subscribe", topicName) 
                      .option("startingOffsets", "latest")
                      .option("group.id",  processingGroup)
                      .load();

Upvotes: -2

Related Questions