gjin
gjin

Reputation: 929

Spark shuffling data before insert

CalcDf().show results into 13 stages(0-12) +1(13) for the show itself. enter image description here

When I try to write the result to table, I assume there should only be 13 stages(0-12), instead I see and additional stage(13). Where does it come from and what does it do? I'm not performing any repartition or other operation that would require a shuffle. As far as I understand spark should just write 1100 files into the table, but it's not what's happening.

CalcDf()
.write
.mode(SaveMode.Overwrite)
.insertInto("tn")

enter image description here CalcDf() logic

val dim = spark.sparkContext.broadcast(
spark.table("dim")
.as[Dim]
.map(r => r.id-> r.col)
.collect().toMap
)

spark.table("table")
.as[CustomCC]
.groupByKey(_.id)
.flatMapGroups{case(k, iterator) => CustomCC.mapRows(iterator, dim)}
.withColumn("time_key", lit("2021-07-01"))

Upvotes: 1

Views: 167

Answers (1)

Vikas
Vikas

Reputation: 305

Previous stage #12 has done shuffle write so any subsequent stage will have to read data from this via shuffle read (which you notice in #13).

Why is there an additional stage ?

because stage 12 has shuffle write and not an Output

for understanding of stage #12 please give information about how CalDf is built.

EDIT

groupByKey will do shuffle write, for getting same ids on single executor JVM.

stage 13 is reading this shuffled data and computing map operation after.

difference in task count can be attributed to action.
In show(), it hasn't read whole shuffled data. Maybe because it displays 20 rows (default) whereas in insertInto(...), it is operating on whole data so reading all.

stage #13 is not just because it is writing files but it is actually doing computation.

Upvotes: 1

Related Questions