John Lin
John Lin

Reputation: 1220

Spark SQL RANK() over ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING fails

I encountered a SQL clause which Spark SQL behaves differently (a bug?) from others (I compared with Hive).

You may copy and paste the following statements to test in hive shell.

hive>
CREATE TABLE t (v INT);
INSERT INTO t (v) VALUES (11), (21), (31), (42), (52);
SELECT v % 10 AS d, v, RANK() OVER (PARTITION BY v % 10 ORDER BY v ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS rank FROM t;

hive>

The result shows what we expect.

1       11      1
1       21      2
1       31      3
2       42      1
2       52      2

However, when testing the following equivalent code in spark-shell,

scala>
Seq(11, 21, 31, 42, 52).toDF("v").createOrReplaceTempView("t")
spark.sql("SELECT v % 10 AS d, v, RANK() OVER (PARTITION BY v % 10 ORDER BY v ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS rank FROM t").show

scala>

we get an exception.

org.apache.spark.sql.AnalysisException: Window Frame ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING must match the required frame ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW;
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:39)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:91)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveWindowFrame$$anonfun$apply$30$$anonfun$applyOrElse$11.applyOrElse(Analyzer.scala:2153)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveWindowFrame$$anonfun$apply$30$$anonfun$applyOrElse$11.applyOrElse(Analyzer.scala:2149)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
  at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsDown$1.apply(QueryPlan.scala:258)
  at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsDown$1.apply(QueryPlan.scala:258)
  at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:279)
  at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:289)
  at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:293)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.immutable.List.map(List.scala:285)
  at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:293)
  at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$6.apply(QueryPlan.scala:298)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
  at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:298)
  at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsDown(QueryPlan.scala:258)
  at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressions(QueryPlan.scala:249)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveWindowFrame$$anonfun$apply$30.applyOrElse(Analyzer.scala:2149)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveWindowFrame$$anonfun$apply$30.applyOrElse(Analyzer.scala:2148)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveWindowFrame$.apply(Analyzer.scala:2148)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveWindowFrame$.apply(Analyzer.scala:2147)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
  at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
  at scala.collection.immutable.List.foldLeft(List.scala:84)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
  at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:69)
  at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:67)
  at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:50)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:66)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:623)
  ... 48 elided

This happens in both Spark 2.2.0 and Spark 2.3.0.

Is it a bug? Or some misunderstanding of mine?

P.S. I've also tested the following functions (by replacing RANK() in the clause).

ROW_NUMBER()    => same exception
DENSE_RANK()    => same exception
CUME_DIST()     => same exception
PERCENT_RANK()  => same exception
NTILE()         => same exception
LEAD(v)         => same exception
LAG(v)          => same exception
FIRST_VALUE(v)  => OK
LAST_VALUE(v)   => OK
COUNT(v)        => OK
SUM(v)          => OK
AVG(v)          => OK
MEAN(v)         => OK
MIN(v)          => OK
MAX(v)          => OK
VARIANCE(v)     => OK
STDDEV(v)       => OK
COLLECT_LIST(v) => OK
COLLECT_SET(v)  => OK

Upvotes: 2

Views: 7246

Answers (1)

Jaishree Rout
Jaishree Rout

Reputation: 392

ROW_NUMBER()    => same exception
DENSE_RANK()    => same exception
CUME_DIST()     => same exception
PERCENT_RANK()  => same exception
NTILE()         => same exception
LEAD(v)         => same exception
LAG(v)          => same exception

My first point is do you know why we use ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING ?

You have to first understand the concept of ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING.

It will return list of rows/records (between the frame of preceding value and following value). If you are using ROW_NUMBER()/Lead/LAG/ or the above analytical function for this, can you think what it will return. If you want to find just row number then why you are using ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING.

Your error says that the frame is not given properly.

Upvotes: 1

Related Questions