Reputation: 101
I have a Spark Streaming Job thats reading from Kafka and saving it into Redshift.
The batch of RDD contains data with a column "groupId", however the follow code below isn't running forEach concurrently, its running it serially in Yarn Client Mode.
Yarn Environment:
inputDstream.foreachRDD { eventRdd: RDD[Event] =>
...
// Convert eventRdd to eventDF
val groupIds = eventDF.select("group_id").distinct.collect.flatMap(_.toSeq)
groupIds.par.foreach{ groupId =>
val teventDF = eventDF.where($"group_id" <=> groupId)
val teventDFWithVersion = teventDF.withColumn("schema_id", lit(version))
teventDFWithVersion.write
.format("io.github.spark_redshift_community.spark.redshift")
.options(opts)
.mode("Append")
.save()
}
}
Again, the operation in groupsIds.par.foreach is running serially, instead of parallel. With increase in groups, my application starts to choke up and processing time spikes.
How do I get Spark to save my batches of data concurrently?
Upvotes: 0
Views: 139
Reputation: 101
Driver is running on m5.large (2 CPU) but only has 1 CPU for the Driver Application since other services are occupying the other CPU.
array.par.foreach{} runs concurrently based on the number of vCPUS available.
Running the Driver with more CPU allows more concurrent writes.
Solution: Run the Spark Driver Application in Client mode on machine with more CPU. Or run Spark Application as Cluster Mode with --driver-cores 4.
Upvotes: 0