Reputation: 1388
I am running HDFS and Spark locally and trying to understand how Spark persistence works. My objective is to store a joined dataset in memory and then run queries against it on the fly. However, my queries seem to be redoing the join rather than simply scanning through the persisted pre-joined dataset.
I have created and persisted two dataframes, let's say df1 and df2, by loading in two CSV files from HDFS. I persist a join of the two dataframes in memory:
val result = df1.join(df2, "USERNAME")
result.persist()
result.count()
I then define some operations on top of result:
val result2 = result.select("FOO", "BAR").groupBy("FOO").sum("BAR")
result2.show()
'result2' does not piggy back on the persisted result and redoes the join on its own. Here are the physical plans for result and result2:
== Physical Plan for result ==
InMemoryColumnarTableScan [...], (InMemoryRelation [...], true, 10000, StorageLevel(true, true, false, true, 1), (TungstenProject [...]), None)
== Physical Plan for result2 ==
TungstenAggregate(key=[FOO#2], functions=[(sum(cast(BAR#10 as double)),mode=Final,isDistinct=false)], output=[FOO#2,sum(BAR)#837])
TungstenExchange hashpartitioning(FOO#2)
TungstenAggregate(key=[FOO#2], functions=[(sum(cast(BAR#10 as double)),mode=Partial,isDistinct=false)], output=[FOO#2,currentSum#1311])
InMemoryColumnarTableScan [FOO#2,BAR#10], (InMemoryRelation [...], true, 10000, StorageLevel(true, true, false, true, 1), (TungstenProject [...]), None)
I would naively assume that since the join is already done and partitioned in memory, the second operation would simply consist of aggregation operations on each partition. It should be more expensive to redo the join from scratch. Am I assuming incorrectly or doing something wrong? Also, is this the right pattern for retaining a joined dataset for later querying?
Edit: For the record, the second query became a lot more performant after I turned down the number of shuffle partitions. By default, spark.sql.shuffle.partitions is set to 200. Simply setting it to one on my local instance considerably improved performance.
Upvotes: 1
Views: 3112
Reputation: 680
If we look at the the plan, we'll see that Spark actually is making use of the cached data and not redoing the join. Starting from the bottom up:
This is Spark reading the data from your cache:
InMemoryColumnarTableScan [FOO#2,BAR#10], (InMemoryRelation ...
This is Spark aggregating BAR by FOO in each partition - look for mode=Partial
TungstenAggregate(key=[FOO#2], functions=[(sum(cast(BAR#10 as double)),mode=Partial ...
This is Spark shuffling the data from each partition of the previous step:
TungstenExchange hashpartitioning(FOO#2)
This is Spark aggregating the shuffled partition sums - look for mode=Final
TungstenAggregate(key=[FOO#2], functions=[(sum(cast(BAR#10 as double)),mode=Final ...
Reading these plans is a bit of a pain so if you have access to the SQL tab of the Spark UI (I think 1.5+), I'd recommend using that instead.
Upvotes: 4