Reputation: 2318
I am trying to do a window function on a Spark DataFrame using RangeBetween on a column of type Long, and the results of the window are not correct. Am I doing something wrong?
Here is my DataFrame:
val rowsRdd: RDD[Row] = spark.sparkContext.parallelize(
Seq(
Row("2014-11-01 08:10:10.12345", 141482941012345L),
Row("2014-11-01 09:10:10.12345", 141483301012345L),
Row("2014-11-01 10:10:10.12345", 141483661012345L),
Row("2014-11-02 10:10:10.12345", 141492301012345L),
Row("2014-11-03 10:10:10.12345", 141500941012345L),
Row("2014-11-04 10:10:10.12345", 141509581012345L),
Row("2014-11-05 10:10:10.12345", 141518221012345L),
Row("2014-11-06 10:10:10.12345", 141526861012345L),
Row("2014-11-07 10:10:10.12345", 141535501012345L),
Row("2014-11-08 10:10:10.12345", 141544141012345L)
)
)
val schema = new StructType()
.add(StructField("dateTime", StringType, true))
.add(StructField("unixTime", LongType, true))
val df = spark.createDataFrame(rowsRdd, schema)
df.show(10, false)
df.printSchema()
Which is:
+-------------------------+---------------+
|dateTime |unixTime |
+-------------------------+---------------+
|2014-11-01 08:10:10.12345|141482941012345|
|2014-11-01 09:10:10.12345|141483301012345|
|2014-11-01 10:10:10.12345|141483661012345|
|2014-11-02 10:10:10.12345|141492301012345|
|2014-11-03 10:10:10.12345|141500941012345|
|2014-11-04 10:10:10.12345|141509581012345|
|2014-11-05 10:10:10.12345|141518221012345|
|2014-11-06 10:10:10.12345|141526861012345|
|2014-11-07 10:10:10.12345|141535501012345|
|2014-11-08 10:10:10.12345|141544141012345|
+-------------------------+---------------+
Schema:
root
|-- dateTime: string (nullable = true)
|-- unixTime: long (nullable = true)
The first column is a timestamp of an event (string, we will not use it in practice) and second column is the unix time corresponding to the timestamp in units of 10e-5 seconds.
Now I want to calculate a number of events in the window proceeding the current row. For example with 3 hour window I do:
val hour: Long = 60*60*100000L
val w = Window.orderBy(col("unixTime")).rangeBetween(-3*hour, 0)
val df2 = df.withColumn("cts", count(col("dateTime")).over(w)).orderBy(asc("unixTime"))
Which returns correctly:
+-------------------------+---------------+---+
|dateTime |unixTime |cts|
+-------------------------+---------------+---+
|2014-11-01 08:10:10.12345|141482941012345|1 |
|2014-11-01 09:10:10.12345|141483301012345|2 |
|2014-11-01 10:10:10.12345|141483661012345|3 |
|2014-11-02 10:10:10.12345|141492301012345|1 |
|2014-11-03 10:10:10.12345|141500941012345|1 |
|2014-11-04 10:10:10.12345|141509581012345|1 |
|2014-11-05 10:10:10.12345|141518221012345|1 |
|2014-11-06 10:10:10.12345|141526861012345|1 |
|2014-11-07 10:10:10.12345|141535501012345|1 |
|2014-11-08 10:10:10.12345|141544141012345|1 |
+-------------------------+---------------+---+
Here is the result for window of 6 hours. Why is result all 0 now?
val hour: Long = 60*60*100000L
val w = Window.orderBy(col("unixTime")).rangeBetween(-6*hour, 0)
val df2 = df.withColumn("cts", count(col("dateTime")).over(w)).orderBy(asc("unixTime"))
+-------------------------+---------------+---+
|dateTime |unixTime |cts|
+-------------------------+---------------+---+
|2014-11-01 08:10:10.12345|141482941012345|0 |
|2014-11-01 09:10:10.12345|141483301012345|0 |
|2014-11-01 10:10:10.12345|141483661012345|0 |
|2014-11-02 10:10:10.12345|141492301012345|0 |
|2014-11-03 10:10:10.12345|141500941012345|0 |
|2014-11-04 10:10:10.12345|141509581012345|0 |
|2014-11-05 10:10:10.12345|141518221012345|0 |
|2014-11-06 10:10:10.12345|141526861012345|0 |
|2014-11-07 10:10:10.12345|141535501012345|0 |
|2014-11-08 10:10:10.12345|141544141012345|0 |
+-------------------------+---------------+---+
Here is what happens for 12 hours. Why is result all 1 now?
val hour: Long = 60*60*100000L
val w = Window.orderBy(col("unixTime")).rangeBetween(-12*hour, 0)
val df2 = df.withColumn("cts", count(col("dateTime")).over(w)).orderBy(asc("unixTime"))
+-------------------------+---------------+---+
|dateTime |unixTime |cts|
+-------------------------+---------------+---+
|2014-11-01 08:10:10.12345|141482941012345|1 |
|2014-11-01 09:10:10.12345|141483301012345|1 |
|2014-11-01 10:10:10.12345|141483661012345|1 |
|2014-11-02 10:10:10.12345|141492301012345|1 |
|2014-11-03 10:10:10.12345|141500941012345|1 |
|2014-11-04 10:10:10.12345|141509581012345|1 |
|2014-11-05 10:10:10.12345|141518221012345|1 |
|2014-11-06 10:10:10.12345|141526861012345|1 |
|2014-11-07 10:10:10.12345|141535501012345|1 |
|2014-11-08 10:10:10.12345|141544141012345|1 |
+-------------------------+---------------+---+
What is going on here? It does not work correctly with any large rangeBetween values.
Edit: 9/11/2017
Is it something related to this issu? [SPARK-19451][SQL] rangeBetween method should accept Long value as boundary #18540. Is it already implemented in latest version of Spark?
Upvotes: 2
Views: 2032
Reputation: 36
It is indeed related to the linked issue. 6 * hour
is 2160000000 which is larger than Integer.MAX_VALUE
(2147483647) therefore it results in integer overflow:
scala> (6 * hour).toInt
res4: Int = -2134967296
The issue has been fixed on current master and will be released in Spark 2.3.
Upvotes: 2