Reputation: 879
Using an SQLTransformers
we can create new columns in a dataframe and have a Pipeline
of these SQLTransformers
as well. We can do the same thing using multiple calls to selectExpr
methods on dataframes too.
But are the performace optimization metrics that are applied to the selectExpr calls applied to a pipeline of SQLTransformers
as well?
For example consider the two snippets of code below:
#Method 1
df = spark.table("transactions")
df = df.selectExpr("*","sum(amt) over (partition by account) as acc_sum")
df = df.selectExpr("*","sum(amt) over (partition by dt) as dt_sum")
df.show(10)
#Method 2
df = spark.table("transactions")
trans1 = SQLTransformer(statement ="SELECT *,sum(amt) over (partition by account) as acc_sum from __THIS__")
trans2 = SQLTransformer(statement ="SELECT *,sum(amt) over (partition by dt) as dt_sum from __THIS__")
pipe = Pipeline(stage[trans1,trans2])
transPipe = pipe.fit(df)
transPipe.transform(df).show(10)
Will the performance for both of these ways of computing the same thing be the same?
Or will there be some extra optimizations that are applied to method 1 that are not used in method 2?
Upvotes: 1
Views: 317
Reputation: 35229
No additional optimizations. As always, when in doubt, check execution plan:
df = spark.createDataFrame([(1, 1, 1)], ("amt", "account", "dt"))
(df
.selectExpr("*","sum(amt) over (partition by account) as acc_sum")
.selectExpr("*","sum(amt) over (partition by dt) as dt_sum")
.explain(True))
generates:
== Parsed Logical Plan ==
'Project [*, 'sum('amt) windowspecdefinition('dt, unspecifiedframe$()) AS dt_sum#165]
+- AnalysisBarrier Project [amt#22L, account#23L, dt#24L, acc_sum#158L]
== Analyzed Logical Plan ==
amt: bigint, account: bigint, dt: bigint, acc_sum: bigint, dt_sum: bigint
Project [amt#22L, account#23L, dt#24L, acc_sum#158L, dt_sum#165L]
+- Project [amt#22L, account#23L, dt#24L, acc_sum#158L, dt_sum#165L, dt_sum#165L]
+- Window [sum(amt#22L) windowspecdefinition(dt#24L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS dt_sum#165L], [dt#24L]
+- Project [amt#22L, account#23L, dt#24L, acc_sum#158L]
+- Project [amt#22L, account#23L, dt#24L, acc_sum#158L]
+- Project [amt#22L, account#23L, dt#24L, acc_sum#158L, acc_sum#158L]
+- Window [sum(amt#22L) windowspecdefinition(account#23L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS acc_sum#158L], [account#23L]
+- Project [amt#22L, account#23L, dt#24L]
+- LogicalRDD [amt#22L, account#23L, dt#24L], false
== Optimized Logical Plan ==
Window [sum(amt#22L) windowspecdefinition(dt#24L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS dt_sum#165L], [dt#24L]
+- Window [sum(amt#22L) windowspecdefinition(account#23L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS acc_sum#158L], [account#23L]
+- LogicalRDD [amt#22L, account#23L, dt#24L], false
== Physical Plan ==
Window [sum(amt#22L) windowspecdefinition(dt#24L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS dt_sum#165L], [dt#24L]
+- *Sort [dt#24L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(dt#24L, 200)
+- Window [sum(amt#22L) windowspecdefinition(account#23L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS acc_sum#158L], [account#23L]
+- *Sort [account#23L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(account#23L, 200)
+- Scan ExistingRDD[amt#22L,account#23L,dt#24L]
while
trans2.transform(trans1.transform(df)).explain(True)
generates
== Parsed Logical Plan ==
'Project [*, 'sum('amt) windowspecdefinition('dt, unspecifiedframe$()) AS dt_sum#150]
+- 'UnresolvedRelation `SQLTransformer_4318bd7007cefbf17a97_826abb6c003c`
== Analyzed Logical Plan ==
amt: bigint, account: bigint, dt: bigint, acc_sum: bigint, dt_sum: bigint
Project [amt#22L, account#23L, dt#24L, acc_sum#120L, dt_sum#150L]
+- Project [amt#22L, account#23L, dt#24L, acc_sum#120L, dt_sum#150L, dt_sum#150L]
+- Window [sum(amt#22L) windowspecdefinition(dt#24L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS dt_sum#150L], [dt#24L]
+- Project [amt#22L, account#23L, dt#24L, acc_sum#120L]
+- SubqueryAlias sqltransformer_4318bd7007cefbf17a97_826abb6c003c
+- Project [amt#22L, account#23L, dt#24L, acc_sum#120L]
+- Project [amt#22L, account#23L, dt#24L, acc_sum#120L, acc_sum#120L]
+- Window [sum(amt#22L) windowspecdefinition(account#23L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS acc_sum#120L], [account#23L]
+- Project [amt#22L, account#23L, dt#24L]
+- SubqueryAlias sqltransformer_4688bba599a7f5a09c39_f5e9d251099e
+- LogicalRDD [amt#22L, account#23L, dt#24L], false
== Optimized Logical Plan ==
Window [sum(amt#22L) windowspecdefinition(dt#24L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS dt_sum#150L], [dt#24L]
+- Window [sum(amt#22L) windowspecdefinition(account#23L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS acc_sum#120L], [account#23L]
+- LogicalRDD [amt#22L, account#23L, dt#24L], false
== Physical Plan ==
Window [sum(amt#22L) windowspecdefinition(dt#24L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS dt_sum#150L], [dt#24L]
+- *Sort [dt#24L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(dt#24L, 200)
+- Window [sum(amt#22L) windowspecdefinition(account#23L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS acc_sum#120L], [account#23L]
+- *Sort [account#23L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(account#23L, 200)
+- Scan ExistingRDD[amt#22L,account#23L,dt#24L]
As you can see optimized and physical plans are identical.
Upvotes: 2