Reputation: 731
I have a Kafka topic with 3 partitions and I'm consuming that data using spark structured streaming. I have 3 consumers (lets say consumer group A) reading from single partition each, everything is working file till here.
I have a new requirement to read from the same topic and I want to parallelize it by creating 3 consumers (say consumer group B) again each reading from single partition. As I'm using structured streaming I can't mention group.id
explicitly.
Will consumers from different group pointing to single/same partition read all the data ?
Upvotes: 2
Views: 2230
Reputation: 1005
From Spark 3.0.1 documentation:
By default, each query generates a unique group id for reading data. This ensures that each Kafka source has its own consumer group that does not face interference from any other consumer, and therefore can read all of the partitions of its subscribed topics.
So, if you are using assign
option and mentioning which partition to use it will read all data from a specific partition as by it's default nature it will be a different consumer group (group.id). assign
option takes json string as a value and can have multiple partitions from different topics as well. For e.g., {"topicA":[0,1],"topicB":[2,4]}
.
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host:port")
.option("assign", "{"topic-name":[0]}")
.load()
Upvotes: 2
Reputation: 18525
Unless you are using Spark 3.x or higher, you will not be able to set the group.id
in your Kafka input stream. Using Spark 3.x you could, as you have mentioned, have two different Structured Streaming jobs providing two different group.id to ensure that each job reads all message of the topic independent of the other job.
For Spark versions <= 2.4.x, Spark itself will create a unique Consumer Group for you as you can look up in the code on GitHub:
// Each running query should use its own group id. Otherwise, the query may be only
// assigned partial data since Kafka will assign partitions to multiple consumers having
// the same group id. Hence, we should generate a unique id for each query.
val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
So, also in that case, having two different Streaming Jobs will ensure that you have two different ConsumerGroup which allows both jobs to read all messages from the topic independent of the other job.
Upvotes: 4
Reputation: 3209
Use can use group.id as below for streaming
String processingGroup = "processingGroupA";
Dataset<Row> raw_df = sparkSession
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", consumerAppProperties.getProperty(BOOTSTRAP_SERVERS_CONFIG))
.option("subscribe", topicName)
.option("startingOffsets", "latest")
.option("group.id", processingGroup)
.load();
Upvotes: -2