Reputation: 5334
/Downloads/spark-3.0.1-bin-hadoop2.7/bin$ ./spark-shell
20/09/23 10:58:45 WARN Utils: Your hostname, byte-nihal resolves to a loopback address: 127.0.1.1; using 192.168.2.103 instead (on interface enp2s0)
20/09/23 10:58:45 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
20/09/23 10:58:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://192.168.2.103:4040
Spark context available as 'sc' (master = local[*], app id = local-1600838949311).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.0.1
/_/
Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_265)
Type in expressions to have them evaluated.
Type :help for more information.
scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._
scala> println(countDistinct("x"))
count(x)
scala> println(sumDistinct("x"))
sum(DISTINCT x)
scala> println(sum("x"))
sum(x)
scala> println(count("x"))
count(x)
Question:
Is this some kind of bug or the functionality?
note: countDistinct gives correct expression -> count( Distinct x ) in spark version < 3.0
Upvotes: 2
Views: 355
Reputation: 2208
As @Shaido mentioned in the comment section... I Have verified few things to point out that there is some bug in the latest version of spark code in toString. (it may be a bug or feature I'm not completely sure)
spark code version < 3.X behaviour
import org.apache.spark.sql.functions._
println(countDistinct("x")) ---> gives output as count(x)
if we particularly check source code of countDistinct("x")
def countDistinct(columnName: String, columnNames: String*): Column =
countDistinct(Column(columnName), columnNames.map(Column.apply) : _*)
def countDistinct(expr: Column, exprs: Column*): Column = {
withAggregateFunction(Count.apply((expr +: exprs).map(_.expr)), isDistinct = true)
}
as you can see in the second overloaded method Count.apply aggregate function is used and isDistinct=true to count as distinct values
private def withAggregateFunction(
func: AggregateFunction,
isDistinct: Boolean = false): Column = {
Column(func.toAggregateExpression(isDistinct))
}
if you particularly check withAggregateFunction signature it returns Column type and if you check toString method of Column
def toPrettySQL(e: Expression): String = usePrettyExpression(e).sql
it call .sql method on AggregateExpression
AggregateExpression call back sql method of aggregateFunction as per code below
override def sql: String = aggregateFunction.sql(isDistinct)
In our case AggregateFuncion is Count.
def sql(isDistinct: Boolean): String = {
val distinct = if (isDistinct) "DISTINCT " else ""
s"$prettyName($distinct${children.map(_.sql).mkString(", ")})"
}
As per the above code, it should return count(DISTINCT x)
Now, In spark version >= 3.X I have checked the source code, toString behavior is little different.
@scala.annotation.varargs
def countDistinct(expr: Column, exprs: Column*): Column =
// For usage like countDistinct("*"), we should let analyzer expand star and
// resolve function.
Column(UnresolvedFunction("count", (expr +: exprs).map(_.expr), isDistinct = true))
It is now using UnresolvedFunction instead of withAggregateFunction.
In UnresolvedFunction toString method is pretty straight forward as below
override def toString: String = s"'$name(${children.mkString(", ")})"
which prints count(x) .. that is why you are getting output as count(x)
Upvotes: 3